http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 0cfb0c8,0000000..08f7c5f mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@@ -1,1981 -1,0 +1,1981 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.database.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.database.RootPage; +import org.apache.ignite.internal.processors.cache.database.RowStore; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; +import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.local.GridLocalCache; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; - import org.apache.ignite.internal.processors.query.GridQueryProcessor; ++import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.GridEmptyCloseableIterator; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.lang.GridIterator; +import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId; +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; + +/** + * + */ +@SuppressWarnings("PublicInnerClass") +public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager { + /** */ + private boolean indexingEnabled; + + /** */ + // TODO GG-11208 need restore size after restart. + private CacheDataStore locCacheDataStore; + + /** */ + protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>(); + + /** */ + protected final CacheDataStore rmvdStore = new CacheDataStoreImpl(-1, null, null, null); + + /** */ + protected PendingEntriesTree pendingEntries; + + /** */ + private volatile boolean hasPendingEntries; + + /** */ + private static final PendingRow START_PENDING_ROW = new PendingRow(Long.MIN_VALUE, 0); + + /** */ + private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000); + + /** */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** */ + private int updateValSizeThreshold; + + /** {@inheritDoc} */ + @Override public GridAtomicLong globalRemoveId() { + return globalRmvId; + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + super.start0(); + - indexingEnabled = GridQueryProcessor.isEnabled(cctx.config()); ++ indexingEnabled = QueryUtils.isEnabled(cctx.config()); + + updateValSizeThreshold = cctx.kernalContext().config().getMemoryConfiguration().getPageSize() / 2; + + if (cctx.affinityNode()) { + if (cctx.kernalContext().clientNode()) { + assert cctx.isLocal() : cctx.name(); + + cctx.shared().database().init(); + } + + cctx.shared().database().checkpointReadLock(); + + try { + initDataStructures(); + + if (cctx.isLocal()) { + assert cctx.cache() instanceof GridLocalCache : cctx.cache(); + + locCacheDataStore = createCacheDataStore(0); + } + } + finally { + cctx.shared().database().checkpointReadUnlock(); + } + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + protected void initDataStructures() throws IgniteCheckedException { + if (cctx.shared().ttl().eagerTtlEnabled()) { + String name = "PendingEntries"; + + long rootPage = allocateForTree(); + + pendingEntries = new PendingEntriesTree(cctx, + name, + cctx.shared().database().pageMemory(), + rootPage, + cctx.shared().database().globalReuseList(), + true); + } + } + + /** {@inheritDoc} */ + @Override protected void stop0(final boolean cancel, final boolean destroy) { + super.stop0(cancel, destroy); + + if (destroy && cctx.affinityNode()) + destroyCacheDataStructures(destroy); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + super.onKernalStop0(cancel); + + busyLock.block(); + } + + /** + * + */ + protected void destroyCacheDataStructures(boolean destroy) { + assert cctx.affinityNode(); + + try { + if (locCacheDataStore != null) + locCacheDataStore.destroy(); + + if (pendingEntries != null) + pendingEntries.destroy(); + + for (CacheDataStore store : partDataStores.values()) + store.destroy(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e.getMessage(), e); + } + } + + /** + * @param part Partition. + * @return Data store for given entry. + */ + public CacheDataStore dataStore(GridDhtLocalPartition part) { + if (cctx.isLocal()) + return locCacheDataStore; + else { + assert part != null; + + return part.dataStore(); + } + } + + /** {@inheritDoc} */ + @Override public long entriesCount() { + if (cctx.isLocal()) + return locCacheDataStore.size(); + + long size = 0; + + for (CacheDataStore store : partDataStores.values()) + size += store.size(); + + return size; + } + + /** + * @param p Partition. + * @return Partition data. + */ + @Nullable private CacheDataStore partitionData(int p) { + if (cctx.isLocal()) + return locCacheDataStore; + else { + GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false); + + return part != null ? part.dataStore() : null; + } + } + + /** {@inheritDoc} */ + @Override public long entriesCount( + boolean primary, + boolean backup, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + if (cctx.isLocal()) + return entriesCount(0); + else { + ClusterNode locNode = cctx.localNode(); + + long cnt = 0; + + for (GridDhtLocalPartition locPart : cctx.topology().currentLocalPartitions()) { + if (primary) { + if (cctx.affinity().primaryByPartition(locNode, locPart.id(), topVer)) { + cnt += locPart.size(); + + continue; + } + } + + if (backup) { + if (cctx.affinity().backupByPartition(locNode, locPart.id(), topVer)) + cnt += locPart.size(); + } + } + + return cnt; + } + } + + /** {@inheritDoc} */ + @Override public long entriesCount(int part) { + if (cctx.isLocal()){ + assert part == 0; + + return locCacheDataStore.size(); + } + else { + GridDhtLocalPartition locPart = cctx.topology().localPartition(part, AffinityTopologyVersion.NONE, false); + + return locPart == null ? 0 : locPart.size(); + } + } + + /** + * @param primary Primary data flag. + * @param backup Primary data flag. + * @param topVer Topology version. + * @return Data stores iterator. + */ + private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, AffinityTopologyVersion topVer) { + assert primary || backup; + + if (cctx.isLocal()) + return Collections.singleton(locCacheDataStore).iterator(); + else { + final Iterator<GridDhtLocalPartition> it = cctx.topology().currentLocalPartitions().iterator(); + + if (primary && backup) { + return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() { + @Override public CacheDataStore apply(GridDhtLocalPartition part) { + return part.dataStore(); + } + }, true); + } + + final Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : + cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); + + return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() { + @Override public CacheDataStore apply(GridDhtLocalPartition part) { + return part.dataStore(); + } + }, true, + new IgnitePredicate<GridDhtLocalPartition>() { + @Override public boolean apply(GridDhtLocalPartition part) { + return parts.contains(part.id()); + } + }); + } + } + + /** {@inheritDoc} */ + @Override public void invoke(KeyCacheObject key, + GridDhtLocalPartition part, + OffheapInvokeClosure c) + throws IgniteCheckedException { + dataStore(part).invoke(key, c); + } + + /** {@inheritDoc} */ + @Override public void update( + KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + long expireTime, + int partId, + GridDhtLocalPartition part, + @Nullable CacheDataRow oldRow + ) throws IgniteCheckedException { + assert expireTime >= 0; + + dataStore(part).update(key, partId, val, ver, expireTime, oldRow); + } + + /** {@inheritDoc} */ + @Override public void remove( + KeyCacheObject key, + int partId, + GridDhtLocalPartition part + ) throws IgniteCheckedException { + dataStore(part).remove(key, partId); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override @Nullable public CacheDataRow read(GridCacheMapEntry entry) + throws IgniteCheckedException { + KeyCacheObject key = entry.key(); + + assert cctx.isLocal() || entry.localPartition() != null : entry; + + return dataStore(entry.localPartition()).find(key); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(GridCacheMapEntry entry) { + try { + return read(entry) != null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to read value", e); + + return false; + } + } + + /** {@inheritDoc} */ + @Override public void onPartitionCounterUpdated(int part, long cntr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onPartitionInitialCounterUpdated(int part, long cntr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long lastUpdatedPartitionCounter(int part) { + return 0; + } + + /** + * Clears offheap entries. + * + * @param readers {@code True} to clear readers. + */ + @SuppressWarnings("unchecked") + @Override public void clear(boolean readers) { + GridCacheVersion obsoleteVer = null; + + GridIterator<CacheDataRow> it = rowsIterator(true, true, null); + + while (it.hasNext()) { + KeyCacheObject key = it.next().key(); + + try { + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); + + GridCacheEntryEx entry = cctx.cache().entryEx(key); + + entry.clear(obsoleteVer, readers); + } + catch (GridDhtInvalidPartitionException ignore) { + // Ignore. + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry: " + key, e); + } + } + } + + /** {@inheritDoc} */ + @Override public int onUndeploy(ClassLoader ldr) { + // TODO: GG-11141. + return 0; + } + + /** {@inheritDoc} */ + @Override public long offHeapAllocatedSize() { + // TODO GG-10884. + return 0; + } + + /** {@inheritDoc} */ + @Override public void writeAll(Iterable<GridCacheBatchSwapEntry> swapped) throws IgniteCheckedException { + // No-op. + } + + /** + * @param primary {@code True} if need return primary entries. + * @param backup {@code True} if need return backup entries. + * @param topVer Topology version to use. + * @return Entries iterator. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary, + final boolean backup, + final AffinityTopologyVersion topVer, + final boolean keepBinary) throws IgniteCheckedException { + final Iterator<CacheDataRow> it = rowsIterator(primary, backup, topVer); + + return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { + /** */ + private CacheEntryImplEx next; + + @Override protected Cache.Entry<K, V> onNext() { + CacheEntryImplEx ret = next; + + next = null; + + return ret; + } + + @Override protected boolean onHasNext() { + if (next != null) + return true; + + CacheDataRow nextRow = null; + + if (it.hasNext()) + nextRow = it.next(); + + if (nextRow != null) { + KeyCacheObject key = nextRow.key(); + CacheObject val = nextRow.value(); + + Object key0 = cctx.unwrapBinaryIfNeeded(key, keepBinary, false); + Object val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false); + + next = new CacheEntryImplEx(key0, val0, nextRow.version()); + + return true; + } + + return false; + } + }; + } + + /** {@inheritDoc} */ + @Override public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException { + CacheDataStore data = partitionData(part); + + if (data == null) + return new GridEmptyCloseableIterator<>(); + + final GridCursor<? extends CacheDataRow> cur = data.cursor(); + + return new GridCloseableIteratorAdapter<KeyCacheObject>() { + /** */ + private KeyCacheObject next; + + @Override protected KeyCacheObject onNext() { + KeyCacheObject res = next; + + next = null; + + return res; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + if (cur.next()) { + CacheDataRow row = cur.get(); + + next = row.key(); + } + + return next != null; + } + }; + } + + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> iterator(boolean primary, boolean backups, + final AffinityTopologyVersion topVer) + throws IgniteCheckedException { + return rowsIterator(primary, backups, topVer); + } + + /** + * @param primary Primary entries flag. + * @param backups Backup entries flag. + * @param topVer Topology version. + * @return Iterator. + */ + private GridIterator<CacheDataRow> rowsIterator(boolean primary, boolean backups, AffinityTopologyVersion topVer) { + final Iterator<CacheDataStore> dataIt = cacheData(primary, backups, topVer); + + return new GridCloseableIteratorAdapter<CacheDataRow>() { + /** */ + private GridCursor<? extends CacheDataRow> cur; + + /** */ + private CacheDataRow next; + + @Override protected CacheDataRow onNext() { + CacheDataRow res = next; + + next = null; + + return res; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + while (true) { + if (cur == null) { + if (dataIt.hasNext()) + cur = dataIt.next().cursor(); + else + break; + } + + if (cur.next()) { + next = cur.get(); + + break; + } + else + cur = null; + } + + return next != null; + } + }; + } + + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> iterator(int part) throws IgniteCheckedException { + CacheDataStore data = partitionData(part); + + if (data == null) + return new GridEmptyCloseableIterator<>(); + + final GridCursor<? extends CacheDataRow> cur = data.cursor(); + + return new GridCloseableIteratorAdapter<CacheDataRow>() { + /** */ + private CacheDataRow next; + + @Override protected CacheDataRow onNext() { + CacheDataRow res = next; + + next = null; + + return res; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (next != null) + return true; + + if (cur.next()) + next = cur.get(); + + return next != null; + } + }; + } + + /** + * @return Page ID. + * @throws IgniteCheckedException If failed. + */ + private long allocateForTree() throws IgniteCheckedException { + ReuseList reuseList = cctx.shared().database().globalReuseList(); + + long pageId; + + if (reuseList == null || (pageId = reuseList.takeRecycledPage()) == 0L) + pageId = cctx.shared().database().pageMemory().allocatePage(cctx.cacheId(), INDEX_PARTITION, FLAG_IDX); + + return pageId; + } + + /** {@inheritDoc} */ + @Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException { + long pageId = allocateForTree(); + + return new RootPage(new FullPageId(pageId, cctx.cacheId()), true); + } + + /** {@inheritDoc} */ + @Override public void dropRootPageForIndex(String idxName) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public ReuseList reuseListForIndex(String idxName) { + return cctx.shared().database().globalReuseList(); + } + + /** {@inheritDoc} */ + @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, Long partCntr) + throws IgniteCheckedException { + final GridIterator<CacheDataRow> it = iterator(part); + + return new IgniteRebalanceIterator() { + @Override public boolean historical() { + return false; + } + + @Override public boolean hasNextX() throws IgniteCheckedException { + return it.hasNextX(); + } + + @Override public CacheDataRow nextX() throws IgniteCheckedException { + return it.nextX(); + } + + @Override public void removeX() throws IgniteCheckedException { + it.removeX(); + } + + @Override public Iterator<CacheDataRow> iterator() { + return it.iterator(); + } + + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public CacheDataRow next() { + return it.next(); + } + + @Override public void close() { + + } + + @Override public boolean isClosed() { + return false; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** {@inheritDoc} */ + @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { + CacheDataStore dataStore = null; + CacheDataStore oldStore = null; + + do { + dataStore = createCacheDataStore0(p); + + oldStore = partDataStores.putIfAbsent(p, dataStore); + } + while (oldStore != null); + + return dataStore; + } + + /** + * @param p Partition. + * @return Cache data store. + * @throws IgniteCheckedException If failed. + */ + protected CacheDataStore createCacheDataStore0(int p) + throws IgniteCheckedException { + IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database(); + + final long rootPage = allocateForTree(); + + FreeList freeList = cctx.shared().database().globalFreeList(); + + CacheDataRowStore rowStore = new CacheDataRowStore(cctx, freeList); + + String idxName = treeName(p); + + CacheDataTree dataTree = new CacheDataTree(idxName, + cctx.shared().database().globalReuseList(), + rowStore, + cctx, + dbMgr.pageMemory(), + rootPage, + true); + + return new CacheDataStoreImpl(p, idxName, rowStore, dataTree); + } + + /** {@inheritDoc} */ + @Override public Iterable<CacheDataStore> cacheDataStores() { + if (cctx.isLocal()) + return Collections.singleton(locCacheDataStore); + + return new Iterable<CacheDataStore>() { + @Override public Iterator<CacheDataStore> iterator() { + return partDataStores.values().iterator(); + } + }; + } + + /** {@inheritDoc} */ + @Override public void destroyCacheDataStore(int p, CacheDataStore store) throws IgniteCheckedException { + try { + partDataStores.remove(p, store); + + store.destroy(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param p Partition. + * @return Tree name for given partition. + */ + protected final String treeName(int p) { + return BPlusTree.treeName("p-" + p, "CacheData"); + } + + /** {@inheritDoc} */ + @Override public boolean expire( + IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, + int amount + ) throws IgniteCheckedException { + if (hasPendingEntries && pendingEntries != null) { + GridCacheVersion obsoleteVer = null; + + long now = U.currentTimeMillis(); + + GridCursor<PendingRow> cur = pendingEntries.find(START_PENDING_ROW, new PendingRow(now, 0)); + + int cleared = 0; + + while (cur.next()) { + PendingRow row = cur.get(); + + if (amount != -1 && cleared > amount) + return true; + + assert row.key != null && row.link != 0 && row.expireTime != 0 : row; + + if (pendingEntries.remove(row) != null) { + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); + + c.apply(cctx.cache().entryEx(row.key), obsoleteVer); + } + + cleared++; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public long expiredSize() throws IgniteCheckedException { + return pendingEntries != null ? pendingEntries.size() : 0; + } + + /** + * + */ + protected class CacheDataStoreImpl implements CacheDataStore { + /** */ + private final int partId; + + /** Tree name. */ + private String name; + + /** */ + private final CacheDataRowStore rowStore; + + /** */ + private final CacheDataTree dataTree; + + /** Update counter. */ + protected final AtomicLong cntr = new AtomicLong(); + + /** Partition size. */ + protected final AtomicLong storageSize = new AtomicLong(); + + /** Initialized update counter. */ + protected Long initCntr = 0L; + + /** + * @param partId Partition number. + * @param name Name. + * @param rowStore Row store. + * @param dataTree Data tree. + */ + public CacheDataStoreImpl( + int partId, + String name, + CacheDataRowStore rowStore, + CacheDataTree dataTree + ) { + this.partId = partId; + this.name = name; + this.rowStore = rowStore; + this.dataTree = dataTree; + } + + /** {@inheritDoc} */ + @Override public int partId() { + return partId; + } + + /** {@inheritDoc} */ + @Override public int size() { + return (int)storageSize.get(); + } + + /** {@inheritDoc} */ + @Override public long updateCounter() { + return cntr.get(); + } + + /** + * @param val Update index value. + */ + @Override public void updateCounter(long val) { + while (true) { + long val0 = cntr.get(); + + if (val0 >= val) + break; + + if (cntr.compareAndSet(val0, val)) + break; + } + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** + * @param oldRow Old row. + * @param dataRow New row. + * @return {@code True} if it is possible to update old row data. + * @throws IgniteCheckedException If failed. + */ + private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow) + throws IgniteCheckedException { + if (oldRow == null || indexingEnabled) + return false; + + if (oldRow.expireTime() != dataRow.expireTime()) + return false; + + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx); + + if (oldLen > updateValSizeThreshold) + return false; + + int newLen = dataRow.key().valueBytesLength(coCtx) + dataRow.value().valueBytesLength(coCtx); + + return oldLen == newLen; + } + + /** {@inheritDoc} */ + @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) + throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c); + + switch (c.operationType()) { + case PUT: { + assert c.newRow() != null : c; + + CacheDataRow oldRow = c.oldRow(); + + finishUpdate(c.newRow(), oldRow); + + break; + } + + case REMOVE: { + CacheDataRow oldRow = c.oldRow(); + + finishRemove(key, oldRow); + + break; + } + + case NOOP: + break; + + default: + assert false : c.operationType(); + } + } + finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public CacheDataRow createRow(KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException + { + DataRow dataRow = new DataRow(key, val, ver, partId, expireTime); + + if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) + dataRow.link(oldRow.link()); + else { + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + key.valueBytes(coCtx); + val.valueBytes(coCtx); + + rowStore.addRow(dataRow); + } + + assert dataRow.link() != 0 : dataRow; + + return dataRow; + } + + /** {@inheritDoc} */ + @Override public void update(KeyCacheObject key, + int p, + CacheObject val, + GridCacheVersion ver, + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert oldRow == null || oldRow.link() != 0L : oldRow; + + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + DataRow dataRow = new DataRow(key, val, ver, p, expireTime); + + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + // Make sure value bytes initialized. + key.valueBytes(coCtx); + val.valueBytes(coCtx); + + CacheDataRow old; + + if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { + old = oldRow; + + dataRow.link(oldRow.link()); + } + else { + rowStore.addRow(dataRow); + + assert dataRow.link() != 0 : dataRow; + + if (oldRow != null) { + old = oldRow; + + dataTree.putx(dataRow); + } + else + old = dataTree.put(dataRow); + } + + finishUpdate(dataRow, old); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param newRow New row. + * @param oldRow Old row if available. + * @throws IgniteCheckedException If failed. + */ + private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + if (oldRow == null) + storageSize.incrementAndGet(); + + KeyCacheObject key = newRow.key(); + + long expireTime = newRow.expireTime(); + + if (indexingEnabled) { + GridCacheQueryManager qryMgr = cctx.queries(); + + assert qryMgr.enabled(); + + if (oldRow != null) { + qryMgr.store(key, + partId, + oldRow.value(), oldRow.version(), + newRow.value(), newRow.version(), + expireTime, + newRow.link()); + } + else { + qryMgr.store(key, + partId, + null, null, + newRow.value(), newRow.version(), + expireTime, + newRow.link()); + } + } + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); + + if (newRow.link() != oldRow.link()) + rowStore.removeRow(oldRow.link()); + } + + if (pendingEntries != null && expireTime != 0) { + pendingEntries.putx(new PendingRow(expireTime, newRow.link())); + + hasPendingEntries = true; + } + + updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value()); + } + + /** {@inheritDoc} */ + @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + CacheDataRow oldRow = dataTree.remove(new SearchRow(key)); + + finishRemove(key, oldRow); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param key Key. + * @param oldRow Removed row. + * @throws IgniteCheckedException If failed. + */ + private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + CacheObject val = null; + GridCacheVersion ver = null; + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); + + storageSize.decrementAndGet(); + + val = oldRow.value(); + + ver = oldRow.version(); + } + + if (indexingEnabled) { + GridCacheQueryManager qryMgr = cctx.queries(); + + assert qryMgr.enabled(); + + qryMgr.remove(key, partId, val, ver); + } + + if (oldRow != null) + rowStore.removeRow(oldRow.link()); + + updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null); + } + + /** {@inheritDoc} */ + @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { + key.valueBytes(cctx.cacheObjectContext()); + + CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY); + + if (row != null) + row.key(key); + + return row; + } + + /** {@inheritDoc} */ + @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { + return dataTree.find(null, null); + } + + /** {@inheritDoc} */ + @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower, + KeyCacheObject upper) throws IgniteCheckedException { + SearchRow lowerRow = null; + SearchRow upperRow = null; + + if (lower != null) + lowerRow = new SearchRow(lower); + + if (upper != null) + upperRow = new SearchRow(upper); + + return dataTree.find(lowerRow, upperRow); + } + + /** {@inheritDoc} */ + @Override public void destroy() throws IgniteCheckedException { + final AtomicReference<IgniteCheckedException> exception = new AtomicReference<>(); + + dataTree.destroy(new IgniteInClosure<CacheSearchRow>() { + @Override public void apply(CacheSearchRow row) { + try { + rowStore.removeRow(row.link()); + } + catch (IgniteCheckedException e) { + U.error(log, "Fail remove row [link=" + row.link() + "]"); + + IgniteCheckedException ex = exception.get(); + + if (ex == null) + exception.set(e); + else + ex.addSuppressed(e); + } + } + }); + + if (exception.get() != null) + throw new IgniteCheckedException("Fail destroy store", exception.get()); + } + + /** {@inheritDoc} */ + @Override public RowStore rowStore() { + return rowStore; + } + + /** + * @return Next update index. + */ + @Override public long nextUpdateCounter() { + return cntr.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public Long initialUpdateCounter() { + return initCntr; + } + + /** {@inheritDoc} */ + @Override public void updateInitialCounter(long cntr) { + if (updateCounter() < cntr) + updateCounter(cntr); + + initCntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void init(long size, long updCntr) { + initCntr = updCntr; + storageSize.set(size); + cntr.set(updCntr); + } + + /** + * @param key Key. + * @param oldVal Old value. + * @param newVal New value. + */ + private void updateIgfsMetrics( + KeyCacheObject key, + CacheObject oldVal, + CacheObject newVal + ) throws IgniteCheckedException { + // In case we deal with IGFS cache, count updated data + if (cctx.cache().isIgfsDataCache() && + !cctx.isNear() && + cctx.kernalContext() + .igfsHelper() + .isIgfsBlockKey(key.value(cctx.cacheObjectContext(), false))) { + int oldSize = valueLength(oldVal); + int newSize = valueLength(newVal); + + int delta = newSize - oldSize; + + if (delta != 0) + cctx.cache().onIgfsDataSizeChanged(delta); + } + } + + /** + * Isolated method to get length of IGFS block. + * + * @param val Value. + * @return Length of value. + */ + private int valueLength(@Nullable CacheObject val) { + if (val == null) + return 0; + + byte[] bytes = val.value(cctx.cacheObjectContext(), false); + + if (bytes != null) + return bytes.length; + else + return 0; + } + } + + /** + * + */ + private static class SearchRow implements CacheSearchRow { + /** */ + private final KeyCacheObject key; + + /** */ + private final int hash; + + /** + * @param key Key. + */ + SearchRow(KeyCacheObject key) { + this.key = key; + + hash = key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject key() { + return key; + } + + /** {@inheritDoc} */ + @Override public long link() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int hash() { + return hash; + } + } + + /** + * + */ + private class DataRow extends CacheDataRowAdapter { + /** */ + protected int part = -1; + + /** */ + protected int hash; + + /** + * @param hash Hash code. + * @param link Link. + * @param rowData Required row data. + */ + DataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) { + super(link); + + this.hash = hash; + + part = PageIdUtils.partId(link); + + try { + // We can not init data row lazily because underlying buffer can be concurrently cleared. + initFromLink(cctx, rowData); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + * @param part Partition. + * @param expireTime Expire time. + */ + DataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime) { + super(0); + + this.hash = key.hashCode(); + this.key = key; + this.val = val; + this.ver = ver; + this.part = part; + this.expireTime = expireTime; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public int hash() { + return hash; + } + + /** {@inheritDoc} */ + @Override public void link(long link) { + this.link = link; + } + } + + /** + * + */ + protected static class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { + /** */ + private final CacheDataRowStore rowStore; + + /** */ + private final GridCacheContext cctx; + + /** + * @param name Tree name. + * @param reuseList Reuse list. + * @param rowStore Row store. + * @param cctx Context. + * @param pageMem Page memory. + * @param metaPageId Meta page ID. + * @param initNew Initialize new index. + * @throws IgniteCheckedException If failed. + */ + public CacheDataTree( + String name, + ReuseList reuseList, + CacheDataRowStore rowStore, + GridCacheContext cctx, + PageMemory pageMem, + long metaPageId, + boolean initNew + ) throws IgniteCheckedException { + super(name, cctx.cacheId(), pageMem, cctx.shared().wal(), cctx.offheap().globalRemoveId(), metaPageId, + reuseList, DataInnerIO.VERSIONS, DataLeafIO.VERSIONS); + + assert rowStore != null; + + this.rowStore = rowStore; + this.cctx = cctx; + + initTree(initNew); + } + + /** {@inheritDoc} */ + @Override protected int compare(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, CacheSearchRow row) + throws IgniteCheckedException { + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + + int cmp = Integer.compare(hash, row.hash()); + + if (cmp != 0) + return cmp; + + long link = ((RowLinkIO)io).getLink(pageAddr, idx); + + assert row.key() != null : row; + + return compareKeys(row.key(), link); + } + + /** {@inheritDoc} */ + @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags) + throws IgniteCheckedException { + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + long link = ((RowLinkIO)io).getLink(pageAddr, idx); + + CacheDataRowAdapter.RowData x = flags != null ? + (CacheDataRowAdapter.RowData)flags : + CacheDataRowAdapter.RowData.FULL; + + return rowStore.dataRow(hash, link, x); + } + + /** + * @param key Key. + * @param link Link. + * @return Compare result. + * @throws IgniteCheckedException If failed. + */ + private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException { + byte[] bytes = key.valueBytes(cctx.cacheObjectContext()); + + PageMemory pageMem = cctx.shared().database().pageMemory(); + + try (Page page = page(pageId(link))) { + long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled. + + assert pageAddr != 0L : link; + + try { + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + + DataPagePayload data = io.readPayload(pageAddr, + itemId(link), + pageMem.pageSize()); + + if (data.nextLink() == 0) { + long addr = pageAddr + data.offset(); + + final int len = PageUtils.getInt(addr, 0); + + int lenCmp = Integer.compare(len, bytes.length); + + if (lenCmp != 0) + return lenCmp; + + addr += 5; // Skip length and type byte. + + final int words = len / 8; + + for (int i = 0; i < words; i++) { + int off = i * 8; + + long b1 = PageUtils.getLong(addr, off); + long b2 = GridUnsafe.getLong(bytes, GridUnsafe.BYTE_ARR_OFF + off); + + int cmp = Long.compare(b1, b2); + + if (cmp != 0) + return cmp; + } + + for (int i = words * 8; i < len; i++) { + byte b1 = PageUtils.getByte(addr, i); + byte b2 = bytes[i]; + + if (b1 != b2) + return b1 > b2 ? 1 : -1; + } + + return 0; + } + } + finally { + page.releaseRead(); + } + } + + // TODO GG-11768. + CacheDataRowAdapter other = new CacheDataRowAdapter(link); + other.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); + + byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext()); + byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext()); + + int lenCmp = Integer.compare(bytes1.length, bytes2.length); + + if (lenCmp != 0) + return lenCmp; + + final int len = bytes1.length; + final int words = len / 8; + + for (int i = 0; i < words; i++) { + int off = GridUnsafe.BYTE_ARR_INT_OFF + i * 8; + + long b1 = GridUnsafe.getLong(bytes1, off); + long b2 = GridUnsafe.getLong(bytes2, off); + + int cmp = Long.compare(b1, b2); + + if (cmp != 0) + return cmp; + } + + for (int i = words * 8; i < len; i++) { + byte b1 = bytes1[i]; + byte b2 = bytes2[i]; + + if (b1 != b2) + return b1 > b2 ? 1 : -1; + } + + return 0; + } + } + + /** + * + */ + protected class CacheDataRowStore extends RowStore { + /** + * @param cctx Cache context. + * @param freeList Free list. + */ + public CacheDataRowStore(GridCacheContext<?, ?> cctx, FreeList freeList) { + super(cctx, freeList); + } + + /** + * @param hash Hash code. + * @param link Link. + * @return Search row. + */ + private CacheSearchRow keySearchRow(int hash, long link) { + return new DataRow(hash, link, CacheDataRowAdapter.RowData.KEY_ONLY); + } + + /** + * @param hash Hash code. + * @param link Link. + * @param rowData Required row data. + * @return Data row. + */ + private CacheDataRow dataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) { + return new DataRow(hash, link, rowData); + } + } + + /** + * @param pageAddr Page address. + * @param off Offset. + * @param link Link. + * @param hash Hash. + */ + private static void store0(long pageAddr, int off, long link, int hash) { + PageUtils.putLong(pageAddr, off, link); + PageUtils.putInt(pageAddr, off + 8, hash); + } + + /** + * + */ + private interface RowLinkIO { + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Row link. + */ + public long getLink(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Key hash code. + */ + public int getHash(long pageAddr, int idx); + } + + /** + * + */ + public static final class DataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO { + /** */ + public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>( + new DataInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + DataInnerIO(int ver) { + super(T_DATA_REF_INNER, ver, true, 12); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { + assert row.link() != 0; + + store0(pageAddr, off, row.link(), row.hash()); + } + + /** {@inheritDoc} */ + @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) { + int hash = getHash(pageAddr, idx); + long link = getLink(pageAddr, idx); + + return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, + int srcIdx) { + int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx); + long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx); + + store0(dstPageAddr, offset(dstIdx), link, hash); + } + + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { + assert idx < getCount(pageAddr) : idx; + + return PageUtils.getLong(pageAddr, offset(idx)); + } + + /** {@inheritDoc} */ + @Override public int getHash(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 8); + } + } + + /** + * + */ + public static final class DataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO { + /** */ + public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>( + new DataLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + DataLeafIO(int ver) { + super(T_DATA_REF_LEAF, ver, 12); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { + assert row.link() != 0; + + store0(pageAddr, off, row.link(), row.hash()); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, + int srcIdx) { + store0(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx), getHash(srcPageAddr, srcIdx)); + } + + /** {@inheritDoc} */ + @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) { + int hash = getHash(buf, idx); + long link = getLink(buf, idx); + + return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link); + } + + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { + assert idx < getCount(pageAddr) : idx; + + return PageUtils.getLong(pageAddr, offset(idx)); + } + + /** {@inheritDoc} */ + @Override public int getHash(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 8); + } + + /** {@inheritDoc} */ + @Override public void visit(long pageAddr, IgniteInClosure<CacheSearchRow> c) { + int cnt = getCount(pageAddr); + + for (int i = 0; i < cnt; i++) + c.apply(new CacheDataRowAdapter(getLink(pageAddr, i))); + } + } + + /** + * + */ + private static class PendingRow { + /** Expire time. */ + private long expireTime; + + /** Link. */ + private long link; + + /** */ + private KeyCacheObject key; + + /** + * @param expireTime Expire time. + * @param link Link + */ + PendingRow(long expireTime, long link) { + assert expireTime != 0; + + this.expireTime = expireTime; + this.link = link; + } + + /** + * @param cctx Context. + * @param expireTime Expire time. + * @param link Link. + * @return Row. + * @throws IgniteCheckedException If failed. + */ + static PendingRow createRowWithKey(GridCacheContext cctx, long expireTime, long link) + throws IgniteCheckedException { + PendingRow row = new PendingRow(expireTime, link); + + CacheDataRowAdapter rowData = new CacheDataRowAdapter(link); + + rowData.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); + + row.key = rowData.key(); + + return row; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PendingRow.class, this); + } + } + + /** + * + */ + protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> { + /** */ + private final GridCacheContext cctx; + + /** + * @param cctx Cache context. + * @param name Tree name. + * @param pageMem Page memory. + * @param metaPageId Meta page ID. + * @param reuseList Reuse list. + * @param initNew Initialize new index. + * @throws IgniteCheckedException If failed. + */ + public PendingEntriesTree( + GridCacheContext cctx, + String name, + PageMemory pageMem, + long metaPageId, + ReuseList reuseList, + boolean initNew) + throws IgniteCheckedException { + super(name, + cctx.cacheId(), + pageMem, + cctx.shared().wal(), + cctx.offheap().globalRemoveId(), + metaPageId, + reuseList, + PendingEntryInnerIO.VERSIONS, + PendingEntryLeafIO.VERSIONS); + + this.cctx = cctx; + + initTree(initNew); + } + + /** {@inheritDoc} */ + @Override protected int compare(BPlusIO<PendingRow> io, long pageAddr, int idx, PendingRow row) + throws IgniteCheckedException { + long expireTime = ((PendingRowIO)io).getExpireTime(pageAddr, idx); + + int cmp = Long.compare(expireTime, row.expireTime); + + if (cmp != 0) + return cmp; + + if (row.link == 0L) + return 0; + + long link = ((PendingRowIO)io).getLink(pageAddr, idx); + + return Long.compare(link, row.link); + } + + /** {@inheritDoc} */ + @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore) + throws IgniteCheckedException { + return io.getLookupRow(this, pageAddr, idx); + } + } + + /** + * + */ + private interface PendingRowIO { + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Expire time. + */ + long getExpireTime(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Link. + */ + long getLink(long pageAddr, int idx); + } + + /** + * + */ + public static class PendingEntryInnerIO extends BPlusInnerIO<PendingRow> implements PendingRowIO { + /** */ + public static final IOVersions<PendingEntryInnerIO> VERSIONS = new IOVersions<>( + new PendingEntryInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + PendingEntryInnerIO(int ver) { + super(T_PENDING_REF_INNER, ver, true, 8 + 8); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException { + assert row.link != 0; + assert row.expireTime != 0; + + PageUtils.putLong(pageAddr, off, row.expireTime); + PageUtils.putLong(pageAddr, off + 8, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, + int dstIdx, + BPlusIO<PendingRow> srcIo, + long srcPageAddr, + int srcIdx) throws IgniteCheckedException { + int dstOff = offset(dstIdx); + + long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx); + long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx); + + PageUtils.putLong(dstPageAddr, dstOff, expireTime); + PageUtils.putLong(dstPageAddr, dstOff + 8, link); + } + + /** {@inheritDoc} */ + @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) + throws IgniteCheckedException { + return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, + getExpireTime(pageAddr, idx), + getLink(pageAddr, idx)); + } + + /** {@inheritDoc} */ + @Override public long getExpireTime(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx)); + } + + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + 8); + } + } + + /** + * + */ + public static class PendingEntryLeafIO extends BPlusLeafIO<PendingRow> implements PendingRowIO { + /** */ + public static final IOVersions<PendingEntryLeafIO> VERSIONS = new IOVersions<>( + new PendingEntryLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + PendingEntryLeafIO(int ver) { + super(T_PENDING_REF_LEAF, ver, 8 + 8); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException { + assert row.link != 0; + assert row.expireTime != 0; + + PageUtils.putLong(pageAddr, off, row.expireTime); + PageUtils.putLong(pageAddr, off + 8, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, + int dstIdx, + BPlusIO<PendingRow> srcIo, + long srcPageAddr, + int srcIdx) throws IgniteCheckedException { + int dstOff = offset(dstIdx); + + long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx); + long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx); + + PageUtils.putLong(dstPageAddr, dstOff, expireTime); + PageUtils.putLong(dstPageAddr, dstOff + 8, link); + } + + /** {@inheritDoc} */ + @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) + throws IgniteCheckedException { + return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, + getExpireTime(pageAddr, idx), + getLink(pageAddr, idx)); + } + + /** {@inheritDoc} */ + @Override public long getExpireTime(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx)); + } + + /** {@inheritDoc} */ + @Override public long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + 8); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 7a2e20f,0e8c263..4e3514d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@@ -68,10 -68,9 +68,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; - import org.apache.ignite.internal.processors.query.GridQueryProcessor; + import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyIterator; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 6370798,0000000..7fb9d0a mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@@ -1,295 -1,0 +1,295 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database; + +import java.io.File; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.mem.DirectMemoryProvider; +import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; +import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdapter implements IgniteChangeGlobalStateSupport { + /** */ + protected PageMemory pageMem; + + /** */ + private FreeListImpl freeList; + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + if (!cctx.kernalContext().clientNode()) + init(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void init() throws IgniteCheckedException { + if (pageMem == null) { + MemoryConfiguration dbCfg = cctx.kernalContext().config().getMemoryConfiguration(); + + if (dbCfg == null) + dbCfg = new MemoryConfiguration(); + + pageMem = initMemory(dbCfg); + + pageMem.start(); + + initDataStructures(); + } + } + + /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log) { + if (freeList != null) + freeList.dumpStatistics(log); + } + + /** + * @throws IgniteCheckedException If failed. + */ + protected void initDataStructures() throws IgniteCheckedException { - freeList = new FreeListImpl(0, cctx.gridName(), pageMem, null, cctx.wal(), 0L, true); ++ freeList = new FreeListImpl(0, cctx.igniteInstanceName(), pageMem, null, cctx.wal(), 0L, true); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void initDataBase() throws IgniteCheckedException{ + // No-op. + } + + /** + * @return Node-global free list. + */ + public FreeList globalFreeList() { + assert freeList != null : "Non initialized"; + + return freeList; + } + + /** + * @return Node-global reuse list. + */ + public ReuseList globalReuseList() { + assert freeList != null : "Non initialized"; + + return freeList; + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + if (pageMem != null) + pageMem.stop(); + } + + /** + * + */ + public boolean persistenceEnabled() { + return false; + } + + /** + * @return Page memory instance. + */ + public PageMemory pageMemory() { + return pageMem; + } + + /** + * + */ + public void lock() throws IgniteCheckedException { + + } + + /** + * + */ + public void unLock(){ + + } + + /** + * No-op for non-persistent storage. + */ + public void checkpointReadLock() { + // No-op. + } + + /** + * No-op for non-persistent storage. + */ + public void checkpointReadUnlock() { + // No-op. + } + + /** + * + */ + @Nullable public IgniteInternalFuture wakeupForCheckpoint(String reason) { + return null; + } + + /** + * Waits until current state is checkpointed. + * + * @throws IgniteCheckedException If failed. + */ + public void waitForCheckpoint(String reason) throws IgniteCheckedException { + // No-op + } + + /** + * + */ + @Nullable public IgniteInternalFuture wakeupForSnapshot(long snapshotId, UUID snapshotNodeId, + Collection<String> cacheNames) { + return null; + } + + /** + * @param discoEvt Before exchange for the given discovery event. + */ + public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException { + // No-op. + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void beforeCachesStop() throws IgniteCheckedException { + // No-op. + } + + /** + * @param cctx Stopped cache context. + */ + public void onCacheStop(GridCacheContext cctx) { + // No-op + } + + /** + * @param snapshotMsg Snapshot message. + * @param initiator Initiator node. + * @param msg message to log + * @return Snapshot creation init future or {@code null} if snapshot is not available. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteInternalFuture startLocalSnapshotCreation(StartFullSnapshotAckDiscoveryMessage snapshotMsg, + ClusterNode initiator, String msg) + throws IgniteCheckedException { + return null; + } + + /** + * @return Future that will be completed when indexes for given cache are restored. + */ + @Nullable public IgniteInternalFuture indexRebuildFuture(int cacheId) { + return null; + } + + /** + * @param dbCfg Database configuration. + * @return Page memory instance. + */ + protected PageMemory initMemory(MemoryConfiguration dbCfg) { + String path = dbCfg.getFileCacheAllocationPath(); + + int concLvl = dbCfg.getConcurrencyLevel(); + + if (concLvl < 1) + concLvl = Runtime.getRuntime().availableProcessors(); + + long fragmentSize = dbCfg.getPageCacheSize() / concLvl; + + if (fragmentSize < 1024 * 1024) + fragmentSize = 1024 * 1024; + + String consId = String.valueOf(cctx.discovery().consistentId()); + + consId = consId.replaceAll("[:,\\.]", "_"); + + File allocPath = path == null ? null : buildPath(path, consId); + + long[] sizes = new long[concLvl]; + + for (int i = 0; i < concLvl; i++) + sizes[i] = fragmentSize; + + DirectMemoryProvider memProvider = path == null ? + new UnsafeMemoryProvider(sizes) : + new MappedFileMemoryProvider( + log, + allocPath, + true, + sizes); + + return new PageMemoryNoStoreImpl(log, memProvider, cctx, dbCfg.getPageSize(), false); + } + + /** + * @param path Path to the working directory. + * @param consId Consistent ID of the local node. + * @return DB storage path. + */ + protected File buildPath(String path, String consId) { + String igniteHomeStr = U.getIgniteHome(); + + File igniteHome = igniteHomeStr != null ? new File(igniteHomeStr) : null; + + File workDir = igniteHome == null ? new File(path) : new File(igniteHome, path); + + return new File(workDir, consId); + } + + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { + + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 2bfa996,48c01f0..bf0cfdc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@@ -422,13 -400,7 +400,7 @@@ public class GridDistributedLockReques writer.incrementState(); -- case 16: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - + case 17: if (!writer.writeBooleanArray("retVals", retVals)) return false; @@@ -544,15 -516,7 +516,7 @@@ reader.incrementState(); -- case 16: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - + case 17: retVals = reader.readBooleanArray("retVals"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index ef52f7b,6ca15de..a1da88e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -30,9 -30,9 +30,10 @@@ import java.util.concurrent.locks.Reent import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; + import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index b1fe6ec,693a049..e2c05cc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@@ -49,13 -44,10 +49,13 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridCircularBuffer; - import org.apache.ignite.internal.processors.query.GridQueryProcessor; + import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@@ -681,8 -523,8 +681,8 @@@ public class GridDhtLocalPartition impl int ord = (int)(reservations >> 32); - if (isEmpty() && + if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && - ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 && + ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 && !groupReserved() && casState(reservations, EVICTED)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 8eac823,50167d8..5f9a4da --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@@ -514,7 -514,7 +514,6 @@@ public class GridDhtLockRequest extend return false; reader.incrementState(); -- } return reader.afterMessageRead(GridDhtLockRequest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index ac3e2c8,605150a..d6486b3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -24,8 -22,8 +24,9 @@@ import java.util.Set import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; + import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java index 0c273f1,752df54..ee5586e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java @@@ -142,7 -142,7 +142,6 @@@ public class GridDhtUnlockRequest exten return false; reader.incrementState(); -- } return reader.afterMessageRead(GridDhtUnlockRequest.class);
