http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a869b21..4a98f6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -82,6 +83,12 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
     public Iterable<CacheDataStore> cacheDataStores();
 
     /**
+     * @param part Partition.
+     * @return Data store.
+     */
+    public CacheDataStore dataStore(GridDhtLocalPartition part);
+
+    /**
      * @param p Partition ID.
      * @param store Data store.
      */
@@ -107,6 +114,15 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
     public long expiredSize() throws IgniteCheckedException;
 
     /**
+     * @param key Key.
+     * @param part Partition.
+     * @param c Tree update closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void invoke(KeyCacheObject key, GridDhtLocalPartition part, 
OffheapInvokeClosure c)
+        throws IgniteCheckedException;
+
+    /**
      * @param key  Key.
      * @param val  Value.
      * @param ver  Version.
@@ -253,6 +269,16 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
     /**
      *
      */
+    interface OffheapInvokeClosure extends 
IgniteTree.InvokeClosure<CacheDataRow> {
+        /**
+         * @return Old row.
+         */
+        @Nullable public CacheDataRow oldRow();
+    }
+
+    /**
+     *
+     */
     interface CacheDataStore {
         /**
          * @return Partition ID.
@@ -297,6 +323,21 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
 
         /**
          * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param expireTime Expire time.
+         * @param oldRow Old row.
+         * @return New row.
+         * @throws IgniteCheckedException If failed.
+         */
+        CacheDataRow createRow(KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
+
+        /**
+         * @param key Key.
          * @param part Partition.
          * @param val Value.
          * @param ver Version.
@@ -313,6 +354,13 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
 
         /**
          * @param key Key.
+         * @param c Closure.
+         * @throws IgniteCheckedException If failed.
+         */
+        public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws 
IgniteCheckedException;
+
+        /**
+         * @param key Key.
          * @param partId Partition number.
          * @throws IgniteCheckedException If failed.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5df99b6..b863edd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -207,7 +207,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
      * @param part Partition.
      * @return Data store for given entry.
      */
-    private CacheDataStore dataStore(GridDhtLocalPartition part) {
+    public CacheDataStore dataStore(GridDhtLocalPartition part) {
         if (cctx.isLocal())
             return locCacheDataStore;
         else {
@@ -329,6 +329,14 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     }
 
     /** {@inheritDoc} */
+    @Override public void invoke(KeyCacheObject key,
+        GridDhtLocalPartition part,
+        OffheapInvokeClosure c)
+        throws IgniteCheckedException {
+        dataStore(part).invoke(key, c);
+    }
+
+    /** {@inheritDoc} */
     @Override public void update(
         KeyCacheObject key,
         CacheObject val,
@@ -838,6 +846,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         protected Long initCntr = 0L;
 
         /**
+         * @param partId Partition number.
          * @param name Name.
          * @param rowStore Row store.
          * @param dataTree Data tree.
@@ -900,6 +909,9 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
             if (oldRow == null || indexingEnabled)
                 return false;
 
+            if (oldRow.expireTime() != dataRow.expireTime())
+                return false;
+
             CacheObjectContext coCtx = cctx.cacheObjectContext();
 
             int oldLen = oldRow.key().valueBytesLength(coCtx) + 
oldRow.value().valueBytesLength(coCtx);
@@ -913,6 +925,71 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
+        @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure 
c)
+            throws IgniteCheckedException {
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+
+            try {
+                dataTree.invoke(new SearchRow(key), 
CacheDataRowAdapter.RowData.NO_KEY, c);
+
+                switch (c.operationType()) {
+                    case PUT: {
+                        assert c.newRow() != null : c;
+
+                        CacheDataRow oldRow = c.oldRow();
+
+                        finishUpdate(c.newRow(), oldRow);
+
+                        break;
+                    }
+
+                    case REMOVE: {
+                        CacheDataRow oldRow = c.oldRow();
+
+                        finishRemove(key, oldRow);
+
+                        break;
+                    }
+
+                    case NOOP:
+                        break;
+
+                    default:
+                        assert false : c.operationType();
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow createRow(KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            @Nullable CacheDataRow oldRow) throws IgniteCheckedException
+        {
+            DataRow dataRow = new DataRow(key, val, ver, partId, expireTime);
+
+            if (canUpdateOldRow(oldRow, dataRow) && 
rowStore.updateRow(oldRow.link(), dataRow))
+                dataRow.link(oldRow.link());
+            else {
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+                key.valueBytes(coCtx);
+                val.valueBytes(coCtx);
+
+                rowStore.addRow(dataRow);
+            }
+
+            assert dataRow.link() != 0 : dataRow;
+
+            return dataRow;
+        }
+
+        /** {@inheritDoc} */
         @Override public void update(KeyCacheObject key,
             int p,
             CacheObject val,
@@ -935,14 +1012,10 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
 
                 CacheDataRow old;
 
-                boolean rmvOld = true;
-
                 if (canUpdateOldRow(oldRow, dataRow) && 
rowStore.updateRow(oldRow.link(), dataRow)) {
                     old = oldRow;
 
                     dataRow.link(oldRow.link());
-
-                    rmvOld = false;
                 }
                 else {
                     rowStore.addRow(dataRow);
@@ -956,43 +1029,68 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
                     }
                     else
                         old = dataTree.put(dataRow);
-
-                    if (old == null)
-                        storageSize.incrementAndGet();
                 }
 
-                if (indexingEnabled) {
-                    GridCacheQueryManager qryMgr = cctx.queries();
+                finishUpdate(dataRow, old);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
 
-                    assert qryMgr.enabled();
+        /**
+         * @param newRow New row.
+         * @param oldRow Old row if available.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow 
oldRow) throws IgniteCheckedException {
+            if (oldRow == null)
+                storageSize.incrementAndGet();
 
-                    if (old != null)
-                        qryMgr.store(key, p, old.value(), old.version(), val, 
ver, expireTime, dataRow.link());
-                    else
-                        qryMgr.store(key, p, null, null, val, ver, expireTime, 
dataRow.link());
-                }
+            KeyCacheObject key = newRow.key();
+
+            long expireTime = newRow.expireTime();
 
-                if (old != null) {
-                    assert old.link() != 0 : old;
+            if (indexingEnabled) {
+                GridCacheQueryManager qryMgr = cctx.queries();
 
-                    if (pendingEntries != null && old.expireTime() != 0)
-                        pendingEntries.removex(new 
PendingRow(old.expireTime(), old.link()));
+                assert qryMgr.enabled();
 
-                    if (rmvOld)
-                        rowStore.removeRow(old.link());
+                if (oldRow != null) {
+                    qryMgr.store(key,
+                        partId,
+                        oldRow.value(), oldRow.version(),
+                        newRow.value(), newRow.version(),
+                        expireTime,
+                        newRow.link());
+                }
+                else {
+                    qryMgr.store(key,
+                        partId,
+                        null, null,
+                        newRow.value(), newRow.version(),
+                        expireTime,
+                        newRow.link());
                 }
+            }
 
-                if (pendingEntries != null && expireTime != 0) {
-                    pendingEntries.putx(new PendingRow(expireTime, 
dataRow.link()));
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
 
-                    hasPendingEntries = true;
-                }
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(oldRow.expireTime(), 
oldRow.link()));
 
-                updateIgfsMetrics(key, (old != null ? old.value() : null), 
val);
+                if (newRow.link() != oldRow.link())
+                    rowStore.removeRow(oldRow.link());
             }
-            finally {
-                busyLock.leaveBusy();
+
+            if (pendingEntries != null && expireTime != 0) {
+                pendingEntries.putx(new PendingRow(expireTime, newRow.link()));
+
+                hasPendingEntries = true;
             }
+
+            updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), 
newRow.value());
         }
 
         /** {@inheritDoc} */
@@ -1001,50 +1099,59 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
             try {
-                CacheDataRow dataRow = dataTree.remove(new SearchRow(key));
-
-                CacheObject val = null;
-                GridCacheVersion ver = null;
+                CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
 
-                if (dataRow != null) {
-                    assert dataRow.link() != 0 : dataRow;
+                finishRemove(key, oldRow);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
 
-                    if (pendingEntries != null && dataRow.expireTime() != 0)
-                        pendingEntries.removex(new 
PendingRow(dataRow.expireTime(), dataRow.link()));
+        /**
+         * @param key Key.
+         * @param oldRow Removed row.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow 
oldRow) throws IgniteCheckedException {
+            CacheObject val = null;
+            GridCacheVersion ver = null;
 
-                    storageSize.decrementAndGet();
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
 
-                    val = dataRow.value();
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(oldRow.expireTime(), 
oldRow.link()));
 
-                    ver = dataRow.version();
-                }
+                storageSize.decrementAndGet();
 
-                if (indexingEnabled) {
-                    GridCacheQueryManager qryMgr = cctx.queries();
+                val = oldRow.value();
 
-                    assert qryMgr.enabled();
+                ver = oldRow.version();
+            }
 
-                    qryMgr.remove(key, partId, val, ver);
-                }
+            if (indexingEnabled) {
+                GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (dataRow != null)
-                    rowStore.removeRow(dataRow.link());
+                assert qryMgr.enabled();
 
-                updateIgfsMetrics(key, (dataRow != null ? dataRow.value() : 
null), null);
-            }
-            finally {
-                busyLock.leaveBusy();
+                qryMgr.remove(key, partId, val, ver);
             }
+
+            if (oldRow != null)
+                rowStore.removeRow(oldRow.link());
+
+            updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), 
null);
         }
 
         /** {@inheritDoc} */
         @Override public CacheDataRow find(KeyCacheObject key) throws 
IgniteCheckedException {
             key.valueBytes(cctx.cacheObjectContext());
 
-            CacheDataRow row = dataTree.findOne(new SearchRow(key), 
dataTree.noKeyC);
+            CacheDataRow row = dataTree.findOne(new SearchRow(key), 
CacheDataRowAdapter.RowData.NO_KEY);
 
             if (row != null)
-                ((CacheDataRowAdapter)row).key(key);
+                row.key(key);
 
             return row;
         }
@@ -1261,17 +1368,6 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         /** */
         private final GridCacheContext cctx;
 
-        /** */
-        private final RowClosure<CacheSearchRow, CacheDataRow> noKeyC = new 
RowClosure<CacheSearchRow, CacheDataRow>() {
-            @Override public CacheDataRow row(BPlusIO<CacheSearchRow> io, long 
pageAddr, int idx)
-                throws IgniteCheckedException {
-                int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
-                long link = ((RowLinkIO)io).getLink(pageAddr, idx);
-
-                return rowStore.dataRow(hash, link, 
CacheDataRowAdapter.RowData.NO_KEY);
-            }
-        };
-
         /**
          * @param name Tree name.
          * @param reuseList Reuse list.
@@ -1320,12 +1416,16 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
-        @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, 
long pageAddr, int idx)
+        @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, 
long pageAddr, int idx, Object flags)
             throws IgniteCheckedException {
             int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
             long link = ((RowLinkIO)io).getLink(pageAddr, idx);
 
-            return rowStore.dataRow(hash, link, 
CacheDataRowAdapter.RowData.FULL);
+            CacheDataRowAdapter.RowData x = flags != null ?
+                (CacheDataRowAdapter.RowData)flags :
+                CacheDataRowAdapter.RowData.FULL;
+
+            return rowStore.dataRow(hash, link, x);
         }
 
         /**
@@ -1705,7 +1805,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
-        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long 
pageAddr, int idx)
+        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long 
pageAddr, int idx, Object ignore)
             throws IgniteCheckedException {
             return io.getLookupRow(this, pageAddr, idx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
index 75ab8e4..cc26b21 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.database;
 
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -48,4 +49,9 @@ public interface CacheDataRow extends CacheSearchRow {
      * @param link Link for this row.
      */
     public void link(long link);
+
+    /**
+     * @param key Key.
+     */
+    public void key(KeyCacheObject key);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 4bfdd99..5a62e75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param key Key.
+     * @param val Value.
+     * @param expireTime Expire time.
+     * @param ver Version.
+     */
+    public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, 
GridCacheVersion ver, long expireTime) {
+        this.key = key;
+        this.val = val;
+        this.ver = ver;
+        this.expireTime = expireTime;
+    }
+
+    /**
      * Read row from data pages.
      *
      * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 47c3254..ca4ad05 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -213,7 +213,7 @@ public class MetadataStorage implements MetaStore {
 
         /** {@inheritDoc} */
         @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, 
final long pageAddr,
-            final int idx) throws IgniteCheckedException {
+            final int idx, Object ignore) throws IgniteCheckedException {
             return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index aa61fbd..8827407 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -120,20 +120,6 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /** */
     private volatile TreeMetaData treeMeta;
 
-    /**
-     *
-     */
-    public static interface RowClosure<L, R> {
-        /**
-         * @param io IO.
-         * @param pageAddr Page address.
-         * @param idx Index.
-         * @return Result.
-         * @throws IgniteCheckedException If failed.
-         */
-        public R row(BPlusIO<L> io, long pageAddr, int idx) throws 
IgniteCheckedException;
-    }
-
     /** */
     private final GridTreePrinter<Long> treePrinter = new 
GridTreePrinter<Long>() {
         /** */
@@ -224,22 +210,20 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             long res = doAskNeighbor(io, pageAddr, back);
 
             if (back) {
-                assert g.getClass() == Remove.class;
-
                 if (io.getForward(pageAddr) != g.backId) // See how g.backId 
is setup in removeDown for this check.
                     return RETRY;
 
-                g.backId = res;
+                g.backId(res);
             }
             else {
                 assert isBack == FALSE.ordinal() : isBack;
 
-                g.fwdId = res;
+                g.fwdId(res);
             }
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Get> search = new Search();
@@ -257,7 +241,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             boolean needBackIfRouting = g.backId != 0;
 
-            g.backId = 0; // Usually we'll go left down and don't need it.
+            g.backId(0L); // Usually we'll go left down and don't need it.
 
             int cnt = io.getCount(pageAddr);
             int idx = findInsertionPoint(io, pageAddr, 0, cnt, g.row, g.shift);
@@ -282,13 +266,13 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             assert !io.isLeaf() : io;
 
             // If idx == cnt then we go right down, else left down: 
getLeft(cnt) == getRight(cnt - 1).
-            g.pageId = inner(io).getLeft(pageAddr, idx);
+            g.pageId(inner(io).getLeft(pageAddr, idx));
 
             // If we see the tree in consistent state, then our right down 
page must be forward for our left down page,
             // we need to setup fwdId and/or backId to be able to check this 
invariant on lower level.
             if (idx < cnt) {
                 // Go left down here.
-                g.fwdId = inner(io).getRight(pageAddr, idx);
+                g.fwdId(inner(io).getRight(pageAddr, idx));
             }
             else {
                 // Go right down here or it is an empty branch.
@@ -301,7 +285,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
                 // Setup fwdId.
                 if (fwdId == 0)
-                    g.fwdId = 0;
+                    g.fwdId(0L);
                 else {
                     // We can do askNeighbor on forward page here because we 
always take locks in forward direction.
                     Result res = askNeighbor(fwdId, g, false);
@@ -312,7 +296,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
                 // Setup backId.
                 if (cnt != 0) // It is not a routing page and we are going to 
the right, can get backId here.
-                    g.backId = inner(io).getLeft(pageAddr, cnt - 1);
+                    g.backId(inner(io).getLeft(pageAddr, cnt - 1));
                 else if (needBackIfRouting) {
                     // Can't get backId here because of possible deadlock and 
it is only needed for remove operation.
                     return GO_DOWN_X;
@@ -321,7 +305,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return GO_DOWN;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Put> replace = new Replace();
@@ -331,6 +315,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      */
     private class Replace extends GetPageHandler<Put> {
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override public Result run0(Page page, long pageAddr, BPlusIO<L> io, 
Put p, int lvl)
             throws IgniteCheckedException  {
             // Check the triangle invariant.
@@ -351,15 +336,26 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             // Detach the old row if we are on a leaf page.
             if (lvl == 0) {
-                assert p.oldRow == null;
+                assert p.oldRow == null; // The old row must be set only once.
+
+                // Inner replace state must be consistent by the end of the 
operation.
+                assert p.needReplaceInner == FALSE || p.needReplaceInner == 
DONE : p.needReplaceInner;
+
+                // Need to replace inner key if now we are replacing the 
rightmost row and have a forward page.
+                if (canGetRowFromInner && idx + 1 == cnt && p.fwdId != 0L && 
p.needReplaceInner == FALSE) {
+                    // Can happen only for invoke, otherwise inner key must be 
replaced on the way down.
+                    assert p.invoke != null;
+
+                    // We need to restart the operation from root to perform 
inner replace.
+                    // On the second pass we will not get here (will avoid 
infinite loop) because
+                    // needReplaceInner will be DONE or our key will not be 
the rightmost anymore.
+                    return RETRY_ROOT;
+                }
 
                 // Get old row in leaf page to reduce contention at upper 
level.
                 p.oldRow = p.needOld ? getRow(io, pageAddr, idx) : 
(T)Boolean.TRUE;
 
                 p.finish();
-
-                // Inner replace state must be consistent by the end of the 
operation.
-                assert p.needReplaceInner == FALSE || p.needReplaceInner == 
DONE : p.needReplaceInner;
             }
 
             boolean needWal = needWalDeltaRecord(page);
@@ -371,7 +367,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Put> insert = new Insert();
@@ -405,6 +401,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                 p.btmLvl++; // Get high.
                 p.row = moveUpRow;
 
+                if (p.invoke != null)
+                    p.invoke.row = moveUpRow;
+
                 // Here forward page can't be concurrently removed because we 
keep write lock on tail which is the only
                 // page who knows about the forward page, because it was just 
produced by split.
                 p.rightId = io.getForward(pageAddr);
@@ -417,7 +416,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> rmvFromLeaf = new RemoveFromLeaf();
@@ -437,15 +436,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             int idx = findInsertionPoint(io, pageAddr, 0, cnt, r.row, 0);
 
-            if (idx < 0) {
-                if (!r.ceil) // We've found exact match on search but now it's 
gone.
-                    return RETRY;
-
-                idx = fix(idx);
-
-                if (idx == cnt) // We can not remove ceiling row here. Bad 
luck.
-                    return NOT_FOUND;
-            }
+            if (idx < 0)
+                return RETRY; // We've found exact match on search but now 
it's gone.
 
             assert idx >= 0 && idx < cnt: idx;
 
@@ -495,7 +487,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockBackAndRmvFromLeaf = new 
LockBackAndRmvFromLeaf();
@@ -520,7 +512,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return res;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockBackAndTail = new 
LockBackAndTail();
@@ -544,7 +536,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return res;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockTailForward = new 
LockTailForward();
@@ -560,7 +552,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockTail = new LockTail();
@@ -590,7 +582,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Void, Bool> cutRoot = new CutRoot();
@@ -620,7 +612,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Long, Bool> addRoot = new AddRoot();
@@ -651,7 +643,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Long, Bool> initRoot = new InitRoot();
@@ -681,7 +673,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /**
      * @param name Tree name.
@@ -947,15 +939,16 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
     /**
      * @param row Lookup row for exact match.
-     * @param c Found row closure.
-     * @return Found result.
+     * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
+     * @return Found result or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
-    public final <R> R findOne(L row, RowClosure<L, R> c) throws 
IgniteCheckedException {
+    @SuppressWarnings("unchecked")
+    public final <R> R findOne(L row, Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
-            GetOne g = new GetOne(row, c);
+            GetOne g = new GetOne(row, x);
 
             doFind(g);
 
@@ -1036,7 +1029,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                             case RETRY:
                                 checkInterrupted();
 
-                                continue; // The child page got splitted, need 
to reread our page.
+                                continue; // The child page got split, need to 
reread our page.
 
                             default:
                                 return res;
@@ -1437,48 +1430,183 @@ public abstract class BPlusTree<L, T extends L> 
extends DataStructure implements
      * @return Removed row.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("unused")
-    public final T removeCeil(L row) throws IgniteCheckedException {
-        return doRemove(row, true, true);
+    @Override public final T remove(L row) throws IgniteCheckedException {
+        return doRemove(row, true);
     }
 
     /**
      * @param row Lookup row.
-     * @return Removed row.
      * @throws IgniteCheckedException If failed.
+     * @return {@code True} if removed row.
      */
-    @Override public final T remove(L row) throws IgniteCheckedException {
-        return doRemove(row, false, true);
+    public final boolean removex(L row) throws IgniteCheckedException {
+        Boolean res = (Boolean)doRemove(row, false);
+
+        return res != null ? res : false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws 
IgniteCheckedException {
+        checkDestroyed();
+
+        Invoke x = new Invoke(row, z, c);
+
+        try {
+            for (;;) {
+                x.init();
+
+                Result res = invokeDown(x, x.rootId, 0L, 0L, x.rootLvl);
+
+                switch (res) {
+                    case RETRY:
+                    case RETRY_ROOT:
+                        checkInterrupted();
+
+                        continue;
+
+                    default:
+                        if (!x.isFinished()) {
+                            res = x.tryFinish();
+
+                            if (res == RETRY || res == RETRY_ROOT) {
+                                checkInterrupted();
+
+                                continue;
+                            }
+
+                            assert x.isFinished(): res;
+                        }
+
+                        return;
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Runtime failure on search row: " 
+ row, e);
+        }
+        catch (RuntimeException e) {
+            throw new IgniteException("Runtime failure on search row: " + row, 
e);
+        }
+        catch (AssertionError e) {
+            throw new AssertionError("Assertion error on search row: " + row, 
e);
+        }
+        finally {
+            x.releaseAll();
+        }
     }
 
     /**
-     * @param row Lookup row.
+     * @param x Invoke operation.
+     * @param pageId Page ID.
+     * @param backId Expected backward page ID if we are going to the right.
+     * @param fwdId Expected forward page ID.
+     * @param lvl Level.
+     * @return Result code.
      * @throws IgniteCheckedException If failed.
-     * @return {@code True} if removed row.
      */
-    public final boolean removex(L row) throws IgniteCheckedException {
-        Boolean res = (Boolean)doRemove(row, false, false);
+    private Result invokeDown(final Invoke x, final long pageId, final long 
backId, final long fwdId, final int lvl)
+        throws IgniteCheckedException {
+        assert lvl >= 0 : lvl;
 
-        return res != null ? res : false;
+        if (x.isTail(pageId, lvl))
+            return FOUND; // We've already locked this page, so return that we 
are ok.
+
+        final Page page = page(pageId);
+
+        try {
+            for (;;) {
+                // Init args.
+                x.pageId(pageId);
+                x.fwdId(fwdId);
+                x.backId(backId);
+
+                Result res = readPage(page, this, search, x, lvl, RETRY);
+
+                switch (res) {
+                    case GO_DOWN_X:
+                        assert backId != 0;
+                        assert x.backId == 0; // We did not setup it yet.
+
+                        x.backId(pageId); // Dirty hack to setup a check 
inside of askNeighbor.
+
+                        // We need to get backId here for our child page, it 
must be the last child of our back.
+                        res = askNeighbor(backId, x, true);
+
+                        if (res != FOUND)
+                            return res; // Retry.
+
+                        assert x.backId != pageId; // It must be updated in 
askNeighbor.
+
+                        // Intentional fallthrough.
+                    case GO_DOWN:
+                        res = x.tryReplaceInner(page, pageId, fwdId, lvl);
+
+                        if (res != RETRY)
+                            res = invokeDown(x, x.pageId, x.backId, x.fwdId, 
lvl - 1);
+
+                        if (res == RETRY_ROOT || x.isFinished())
+                            return res;
+
+                        if (res == RETRY) {
+                            checkInterrupted();
+
+                            continue;
+                        }
+
+                        // Unfinished Put does insertion on the same level.
+                        if (x.isPut())
+                            continue;
+
+                        assert x.isRemove(); // Guarded by isFinished.
+
+                        res = x.finishOrLockTail(page, pageId, backId, fwdId, 
lvl);
+
+                        return res;
+
+                    case NOT_FOUND:
+                        if (lvl == 0)
+                            x.invokeClosure();
+
+                        return x.onNotFound(page, pageId, fwdId, lvl);
+
+                    case FOUND:
+                        if (lvl == 0)
+                            x.invokeClosure();
+
+                        return x.onFound(page, pageId, backId, fwdId, lvl);
+
+                    default:
+                        return res;
+                }
+            }
+        }
+        finally {
+            x.levelExit();
+
+            if (x.canRelease(page, lvl))
+                page.close();
+        }
     }
 
+
     /**
      * @param row Lookup row.
-     * @param ceil If we can remove ceil row when we can not find exact.
      * @param needOld {@code True} if need return removed row.
      * @return Removed row.
      * @throws IgniteCheckedException If failed.
      */
-    private T doRemove(L row, boolean ceil, boolean needOld) throws 
IgniteCheckedException {
+    private T doRemove(L row, boolean needOld) throws IgniteCheckedException {
         checkDestroyed();
 
-        Remove r = new Remove(row, ceil, needOld);
+        Remove r = new Remove(row, needOld);
 
         try {
             for (;;) {
                 r.init();
 
-                switch (removeDown(r, r.rootId, 0L, 0L, r.rootLvl)) {
+                Result res = removeDown(r, r.rootId, 0L, 0L, r.rootLvl);
+
+                switch (res) {
                     case RETRY:
                     case RETRY_ROOT:
                         checkInterrupted();
@@ -1487,15 +1615,11 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
                     default:
                         if (!r.isFinished()) {
-                            Result res = r.finishTail();
+                            res = r.finishTail();
 
                             // If not found, then the tree grew beyond our 
call stack -> retry from the actual root.
                             if (res == RETRY || res == NOT_FOUND) {
-                                int root = getRootLevel();
-
-                                boolean checkRes = r.checkTailLevel(root);
-
-                                assert checkRes : "tail=" + r.tail + ", root=" 
+ root + ", res=" + res;
+                                assert r.checkTailLevel(getRootLevel()) : 
"tail=" + r.tail + ", res=" + res;
 
                                 checkInterrupted();
 
@@ -1521,9 +1645,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             throw new AssertionError("Assertion error on search row: " + row, 
e);
         }
         finally {
-            r.releaseTail();
-
-            r.reuseFreePages();
+            r.releaseAll();
         }
     }
 
@@ -1579,12 +1701,10 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                             continue;
                         }
 
-                        if (res != RETRY_ROOT && !r.isFinished()) {
-                            res = r.finishTail();
+                        if (res == RETRY_ROOT || r.isFinished())
+                            return res;
 
-                            if (res == NOT_FOUND)
-                                res = r.lockTail(pageId, page, backId, fwdId, 
lvl);
-                        }
+                        res = r.finishOrLockTail(page, pageId, backId, fwdId, 
lvl);
 
                         return res;
 
@@ -1592,31 +1712,12 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                         // We are at the bottom.
                         assert lvl == 0 : lvl;
 
-                        if (!r.ceil) {
-                            r.finish();
-
-                            return res;
-                        }
+                        r.finish();
 
-                        // Intentional fallthrough for ceiling remove.
+                        return res;
 
                     case FOUND:
-                        // We must be at the bottom here, just need to remove 
row from the current page.
-                        assert lvl == 0 : lvl;
-
-                        res = r.removeFromLeaf(pageId, page, backId, fwdId);
-
-                        if (res == NOT_FOUND) {
-                            assert r.ceil : "must be a retry if not a ceiling 
remove";
-
-                            r.finish();
-                        }
-                        else if (res == FOUND && r.tail == null) {
-                            // Finish if we don't need to do any merges.
-                            r.finish();
-                        }
-
-                        return res;
+                        return r.tryRemoveFromLeaf(page, pageId, backId, 
fwdId, lvl);
 
                     default:
                         return res;
@@ -1716,7 +1817,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * {@inheritDoc}
      */
     @Override public final T put(T row) throws IgniteCheckedException {
-        return put(row, true);
+        return doPut(row, true);
     }
 
     /**
@@ -1725,7 +1826,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @return {@code True} if replaced existing row.
      */
     public boolean putx(T row) throws IgniteCheckedException {
-        Boolean res = (Boolean)put(row, false);
+        Boolean res = (Boolean)doPut(row, false);
 
         return res != null ? res : false;
     }
@@ -1736,7 +1837,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @return Old row.
      * @throws IgniteCheckedException If failed.
      */
-    private T put(T row, boolean needOld) throws IgniteCheckedException {
+    private T doPut(T row, boolean needOld) throws IgniteCheckedException {
         checkDestroyed();
 
         Put p = new Put(row, needOld);
@@ -1858,7 +1959,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /**
      * @return {@code True} if state was changed.
      */
-    private final boolean markDestroyed() {
+    private boolean markDestroyed() {
         return destroyed.compareAndSet(false, true);
     }
 
@@ -1997,31 +2098,10 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                         assert p.pageId != pageId;
                         assert p.fwdId != fwdId || fwdId == 0;
 
-                        // Need to replace key in inner page. There is no race 
because we keep tail lock after split.
-                        if (p.needReplaceInner == TRUE) {
-                            p.needReplaceInner = FALSE; // Protect from 
retries.
-
-                            long oldFwdId = p.fwdId;
-                            long oldPageId = p.pageId;
-
-                            // Set old args.
-                            p.fwdId = fwdId;
-                            p.pageId = pageId;
-
-                            res = writePage(pageMem, page, this, replace, p, 
lvl, RETRY);
+                        res = p.tryReplaceInner(page, pageId, fwdId, lvl);
 
-                            // Restore args.
-                            p.pageId = oldPageId;
-                            p.fwdId = oldFwdId;
-
-                            if (res != FOUND)
-                                return res; // Need to retry.
-
-                            p.needReplaceInner = DONE; // We can have only 
single matching inner key.
-                        }
-
-                        // Go down recursively.
-                        res = putDown(p, p.pageId, p.fwdId, lvl - 1);
+                        if (res != RETRY) // Go down recursively.
+                            res = putDown(p, p.pageId, p.fwdId, lvl - 1);
 
                         if (res == RETRY_ROOT || p.isFinished())
                             return res;
@@ -2034,21 +2114,13 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                     case FOUND: // Do replace.
                         assert lvl == 0 : "This replace can happen only at the 
bottom level.";
 
-                        // Init args.
-                        p.pageId = pageId;
-                        p.fwdId = fwdId;
-
-                        return writePage(pageMem, page, this, replace, p, lvl, 
RETRY);
+                        return p.tryReplace(page, pageId, fwdId, lvl);
 
                     case NOT_FOUND: // Do insert.
                         assert lvl == p.btmLvl : "must insert at the bottom 
level";
                         assert p.needReplaceInner == FALSE : 
p.needReplaceInner + " " + lvl;
 
-                        // Init args.
-                        p.pageId = pageId;
-                        p.fwdId = fwdId;
-
-                        return writePage(pageMem, page, this, insert, p, lvl, 
RETRY);
+                        return p.tryInsert(page, pageId, fwdId, lvl);
 
                     default:
                         return res;
@@ -2096,29 +2168,32 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      */
     private abstract class Get {
         /** */
-        protected long rmvId;
+        long rmvId;
 
         /** Starting point root level. May be outdated. Must be modified only 
in {@link Get#init()}. */
-        protected int rootLvl;
+        int rootLvl;
 
         /** Starting point root ID. May be outdated. Must be modified only in 
{@link Get#init()}. */
-        protected long rootId;
+        long rootId;
 
         /** */
-        protected L row;
+        L row;
 
         /** In/Out parameter: Page ID. */
-        protected long pageId;
+        long pageId;
 
         /** In/Out parameter: expected forward page ID. */
-        protected long fwdId;
+        long fwdId;
 
         /** In/Out parameter: in case of right turn this field will contain 
backward page ID for the child. */
-        protected long backId;
+        long backId;
 
         /** */
         int shift;
 
+        /** If this operation is a part of invoke. */
+        Invoke invoke;
+
         /**
          * @param row Row.
          */
@@ -2129,6 +2204,21 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         }
 
         /**
+         * @param g Other operation to copy from.
+         * @return {@code this}.
+         */
+        final Get copyFrom(Get g) {
+            rmvId = g.rmvId;
+            rootLvl = g.rootLvl;
+            pageId = g.pageId;
+            fwdId = g.fwdId;
+            backId = g.backId;
+            shift = g.shift;
+
+            return this;
+        }
+
+        /**
          * Initialize operation.
          *
          * @throws IgniteCheckedException If failed.
@@ -2146,7 +2236,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param rootLvl Root level.
          * @param rmvId Remove ID to be afraid of.
          */
-        final void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+        void restartFromRoot(long rootId, int rootLvl, long rmvId) {
             this.rootId = rootId;
             this.rootLvl = rootLvl;
             this.rmvId = rmvId;
@@ -2188,6 +2278,34 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         boolean canRelease(Page page, int lvl) {
             return page != null;
         }
+
+        /**
+         * @param backId Back page ID.
+         */
+        void backId(long backId) {
+            this.backId = backId;
+        }
+
+        /**
+         * @param pageId Page ID.
+         */
+        void pageId(long pageId) {
+            this.pageId = pageId;
+        }
+
+        /**
+         * @param fwdId Forward page ID.
+         */
+        void fwdId(long fwdId) {
+            this.fwdId = fwdId;
+        }
+
+        /**
+         * @return {@code true} If the operation is finished.
+         */
+        boolean isFinished() {
+            throw new IllegalStateException();
+        }
     }
 
     /**
@@ -2195,25 +2313,26 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      */
     private final class GetOne extends Get {
         /** */
-        private final RowClosure<L, ?> c;
+        Object x;
 
         /**
          * @param row Row.
-         * @param c Row closure.
+         * @param x Implementation specific argument.
          */
-        private GetOne(L row, RowClosure<L, ?> c) {
+        private GetOne(L row, Object x) {
             super(row);
 
-            this.c = c;
+            this.x = x;
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) throws IgniteCheckedException {
             // Check if we are on an inner page and can't get row from it.
             if (lvl != 0 && !canGetRowFromInner)
                 return false;
 
-            row = c != null ? (L)c.row(io, pageAddr, idx) : getRow(io, 
pageAddr, idx);
+            row = getRow(io, pageAddr, idx, x);
 
             return true;
         }
@@ -2224,7 +2343,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      */
     private final class GetCursor extends Get {
         /** */
-        private ForwardCursor cursor;
+        ForwardCursor cursor;
 
         /**
          * @param lower Lower bound.
@@ -2261,31 +2380,31 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      */
     private final class Put extends Get {
         /** Right child page ID for split row. */
-        private long rightId;
+        long rightId;
 
         /** Replaced row if any. */
-        private T oldRow;
+        T oldRow;
 
         /**
          * This page is kept locked after split until insert to the upper 
level will not be finished.
          * It is needed because split row will be "in flight" and if we'll 
release tail, remove on
          * split row may fail.
          */
-        private Page tail;
+        Page tail;
 
         /** */
-        private long tailPageAddr;
+        long tailPageAddr;
 
         /**
          * Bottom level for insertion (insert can't go deeper). Will be 
incremented on split on each level.
          */
-        private short btmLvl;
+        short btmLvl;
 
         /** */
         Bool needReplaceInner = FALSE;
 
         /** */
-        private final boolean needOld;
+        final boolean needOld;
 
         /**
          * @param row Row.
@@ -2302,6 +2421,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             if (lvl == 0) // Leaf: need to stop.
                 return true;
 
+            assert btmLvl == 0; // It can not be insert.
+
             // If we can get full row from the inner page, we have to replace 
it with the new one. On the way down
             // we can not miss inner key even in presence of concurrent 
operations because of `triangle` invariant +
             // concurrent inner replace handling by retrying from root.
@@ -2348,10 +2469,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             tail(null, 0L);
         }
 
-        /**
-         * @return {@code true} If finished.
-         */
-        private boolean isFinished() {
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
             return row == null;
         }
 
@@ -2505,45 +2624,404 @@ public abstract class BPlusTree<L, T extends L> 
extends DataStructure implements
                 }
             }
         }
-    }
 
-    /**
-     * Remove operation.
-     */
-    private final class Remove extends Get implements ReuseBag {
-        /** */
-        private boolean ceil;
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryReplaceInner(Page page, long pageId, long fwdId, int 
lvl)
+            throws IgniteCheckedException {
+            // Need to replace key in inner page. There is no race because we 
keep tail lock after split.
+            if (needReplaceInner == TRUE) {
+                needReplaceInner = FALSE; // Protect from retries.
 
-        /** We may need to lock part of the tree branch from the bottom to up 
for multiple levels. */
-        private Tail<L> tail;
+                long oldFwdId = this.fwdId;
+                long oldPageId = this.pageId;
 
-        /** */
-        Bool needReplaceInner = FALSE;
+                // Set old args.
+                this.fwdId = fwdId;
+                this.pageId = pageId;
 
-        /** */
-        Bool needMergeEmptyBranch = FALSE;
+                Result res = writePage(pageMem, page, BPlusTree.this, replace, 
this, lvl, RETRY);
 
-        /** Removed row. */
-        private T rmvd;
+                // Restore args.
+                this.pageId = oldPageId;
+                this.fwdId = oldFwdId;
 
-        /** Current page. */
-        private Page page;
+                if (res == RETRY)
+                    return RETRY;
+
+                needReplaceInner = DONE; // We can have only a single matching 
inner key.
+
+                return FOUND;
+            }
+
+            return NOT_FOUND;
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryInsert(Page page, long pageId, long fwdId, int lvl) 
throws IgniteCheckedException {
+            // Init args.
+            this.pageId = pageId;
+            this.fwdId = fwdId;
+
+            return writePage(pageMem, page, BPlusTree.this, insert, this, lvl, 
RETRY);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Result tryReplace(Page page, long pageId, long fwdId, int lvl) 
throws IgniteCheckedException {
+            // Init args.
+            this.pageId = pageId;
+            this.fwdId = fwdId;
+
+            return writePage(pageMem, page, BPlusTree.this, replace, this, 
lvl, RETRY);
+        }
+    }
+
+    /**
+     * Invoke operation.
+     */
+    private final class Invoke extends Get {
+        /** */
+        Object x;
+
+        /** */
+        InvokeClosure<T> clo;
+
+        /** */
+        Bool closureInvoked = FALSE;
+
+        /** */
+        T foundRow;
+
+        /** */
+        Get op;
+
+        /**
+         * @param row Row.
+         * @param x Implementation specific argument.
+         * @param clo Closure.
+         */
+        private Invoke(L row, Object x, final InvokeClosure<T> clo) {
+            super(row);
+
+            assert clo != null;
+
+            this.clo = clo;
+            this.x = x;
+        }
+
+        /** {@inheritDoc} */
+        @Override void pageId(long pageId) {
+            this.pageId = pageId;
+
+            if (op != null)
+                op.pageId = pageId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void fwdId(long fwdId) {
+            this.fwdId = fwdId;
+
+            if (op != null)
+                op.fwdId = fwdId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void backId(long backId) {
+            this.backId = backId;
+
+            if (op != null)
+                op.backId = backId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+            super.restartFromRoot(rootId, rootLvl, rmvId);
+
+            if (op != null)
+                op.restartFromRoot(rootId, rootLvl, rmvId);
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) throws IgniteCheckedException {
+            // If the operation is initialized, then the closure has been 
called already.
+            if (op != null)
+                return op.found(io, pageAddr, idx, lvl);
+
+            if (lvl == 0) {
+                if (closureInvoked == FALSE) {
+                    closureInvoked = READY;
+
+                    foundRow = getRow(io, pageAddr, idx, x);
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) throws IgniteCheckedException {
+            // If the operation is initialized, then the closure has been 
called already.
+            if (op != null)
+                return op.notFound(io, pageAddr, idx, lvl);
+
+            if (lvl == 0) {
+                if (closureInvoked == FALSE)
+                    closureInvoked = READY;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void invokeClosure() throws IgniteCheckedException {
+            if (closureInvoked != READY)
+                return;
+
+            closureInvoked = DONE;
+
+            clo.call(foundRow);
+
+            switch (clo.operationType()) {
+                case PUT:
+                    T newRow = clo.newRow();
+
+                    assert newRow != null;
+
+                    op = new Put(newRow, false);
+
+                    break;
+
+                case REMOVE:
+                    assert foundRow != null;
+
+                    op = new Remove(row, false);
+
+                    break;
+
+                case NOOP:
+                    return;
+
+                default:
+                    throw new IllegalStateException();
+            }
+
+            op.copyFrom(this);
+
+            op.invoke = this;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean canRelease(Page page, int lvl) {
+            if (page == null)
+                return false;
+
+            if (op == null)
+                return true;
+
+            return op.canRelease(page, lvl);
+        }
+
+        /**
+         * @return {@code true} If it is a {@link Put} operation internally.
+         */
+        private boolean isPut() {
+            return op != null && op.getClass() == Put.class;
+        }
+
+        /**
+         * @return {@code true} If it is a {@link Remove} operation internally.
+         */
+        private boolean isRemove() {
+            return op != null && op.getClass() == Remove.class;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param lvl Level.
+         * @return {@code true} If it is a {@link Remove} and the page is in 
tail.
+         */
+        private boolean isTail(long pageId, int lvl) {
+            return isRemove() && ((Remove)op).isTail(pageId, lvl);
+        }
+
+        /**
+         */
+        private void levelExit() {
+            if (isRemove())
+                ((Remove)op).page = null;
+        }
+
+        /**
+         * Release all the resources by the end of operation.
+         */
+        private void releaseAll() throws IgniteCheckedException {
+            if (isRemove())
+                ((Remove)op).releaseAll();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result onNotFound(Page page, long pageId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            if (op == null)
+                return NOT_FOUND;
+
+            if (isRemove()) {
+                assert lvl == 0;
+
+                ((Remove)op).finish();
+
+                return NOT_FOUND;
+            }
+
+            return ((Put)op).tryInsert(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result onFound(Page page, long pageId, long backId, long 
fwdId, int lvl)
+            throws IgniteCheckedException {
+            if (op == null)
+                return FOUND;
+
+            if (isRemove())
+                return ((Remove)op).tryRemoveFromLeaf(page, pageId, backId, 
fwdId, lvl);
+
+            return  ((Put)op).tryReplace(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryFinish() throws IgniteCheckedException {
+            assert op != null; // Must be guarded by isFinished.
+
+            if (isPut())
+                return RETRY;
+
+            Result res = ((Remove)op).finishTail();
+
+            if (res == NOT_FOUND)
+                res = RETRY;
+
+            assert res == FOUND || res == RETRY: res;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
+            if (closureInvoked != DONE)
+                return false;
+
+            if (op == null)
+                return true;
+
+            return op.isFinished();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl) 
throws IgniteCheckedException {
+            if (!isPut())
+                return NOT_FOUND;
+
+            return ((Put)op).tryReplaceInner(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Result finishOrLockTail(Page page, long pageId, long backId, 
long fwdId, int lvl)
+            throws IgniteCheckedException {
+            return ((Remove)op).finishOrLockTail(page, pageId, backId, fwdId, 
lvl);
+        }
+    }
+
+    /**
+     * Remove operation.
+     */
+    private final class Remove extends Get implements ReuseBag {
+        /** We may need to lock part of the tree branch from the bottom to up 
for multiple levels. */
+        Tail<L> tail;
+
+        /** */
+        Bool needReplaceInner = FALSE;
+
+        /** */
+        Bool needMergeEmptyBranch = FALSE;
+
+        /** Removed row. */
+        T rmvd;
+
+        /** Current page. */
+        Page page;
 
         /** */
-        private Object freePages;
+        Object freePages;
 
         /** */
-        private final boolean needOld;
+        final boolean needOld;
 
         /**
          * @param row Row.
-         * @param ceil If we can remove ceil row when we can not find exact.
          * @param needOld {@code True} If need return old value.
          */
-        private Remove(L row, boolean ceil, boolean needOld) {
+        private Remove(L row, boolean needOld) {
             super(row);
 
-            this.ceil = ceil;
             this.needOld = needOld;
         }
 
@@ -2551,12 +3029,12 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         @SuppressWarnings("unchecked")
         @Override public long pollFreePage() {
             if (freePages == null)
-                return 0;
+                return 0L;
 
             if (freePages.getClass() == GridLongList.class) {
                 GridLongList list = ((GridLongList)freePages);
 
-                return list.isEmpty() ? 0 : list.remove();
+                return list.isEmpty() ? 0L : list.remove();
             }
 
             long res = (long)freePages;
@@ -2569,7 +3047,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public void addFreePage(long pageId) {
-            assert pageId != 0;
+            assert pageId != 0L;
 
             if (freePages == null)
                 freePages = pageId;
@@ -2957,6 +3435,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param idx Index to remove.
          * @throws IgniteCheckedException If failed.
          */
+        @SuppressWarnings("unchecked")
         private void removeDataRowFromLeaf(Page page, BPlusIO<L> io, long 
pageAddr, int cnt, int idx)
             throws IgniteCheckedException {
             assert idx >= 0 && idx < cnt: idx;
@@ -3280,10 +3759,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             return true;
         }
 
-        /**
-         * @return {@code true} If finished.
-         */
-        private boolean isFinished() {
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
             return row == null;
         }
 
@@ -3438,9 +3915,58 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param rootLvl Actual root level.
          * @return {@code true} If tail level is correct.
          */
-        public boolean checkTailLevel(int rootLvl) {
+        private boolean checkTailLevel(int rootLvl) {
             return tail == null || tail.lvl < rootLvl;
         }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void releaseAll() throws IgniteCheckedException {
+            releaseTail();
+            reuseFreePages();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result finishOrLockTail(Page page, long pageId, long backId, 
long fwdId, int lvl)
+            throws IgniteCheckedException {
+            Result res = finishTail();
+
+            if (res == NOT_FOUND)
+                res = lockTail(pageId, page, backId, fwdId, lvl);
+
+            return res;
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryRemoveFromLeaf(Page page, long pageId, long backId, 
long fwdId, int lvl)
+            throws IgniteCheckedException {
+            // We must be at the bottom here, just need to remove row from the 
current page.
+            assert lvl == 0 : lvl;
+
+            Result res = removeFromLeaf(pageId, page, backId, fwdId);
+
+            if (res == FOUND && tail == null) // Finish if we don't need to do 
any merges.
+                finish();
+
+            return res;
+        }
     }
 
     /**
@@ -3619,7 +4145,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     protected abstract int compare(BPlusIO<L> io, long pageAddr, int idx, L 
row) throws IgniteCheckedException;
 
     /**
-     * Get the full detached row. Can be called on inner page only if {@link 
#canGetRowFromInner} is {@code true}.
+     * Get a full detached data row.
      *
      * @param io IO.
      * @param pageAddr Page address.
@@ -3627,7 +4153,21 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @return Full detached data row.
      * @throws IgniteCheckedException If failed.
      */
-    protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx) throws 
IgniteCheckedException;
+    protected final T getRow(BPlusIO<L> io, long pageAddr, int idx) throws 
IgniteCheckedException {
+        return getRow(io, pageAddr, idx, null);
+    }
+
+    /**
+     * Get data row. Can be called on inner page only if {@link 
#canGetRowFromInner} is {@code true}.
+     *
+     * @param io IO.
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
+     * @return Data row.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object 
x) throws IgniteCheckedException;
 
     /**
      * Forward cursor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index fc78f69..0c71731 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -97,6 +97,11 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
+    @Override protected long nextPartCounter() {
+        return locPart.nextUpdateCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public int memorySize() throws IgniteCheckedException {
         int rdrsOverhead;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
index 8dcd205..7eae0d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Interface for ignite internal tree.
@@ -34,6 +35,14 @@ public interface IgniteTree<L, T> {
     public T put(T val) throws IgniteCheckedException;
 
     /**
+     * @param key Key.
+     * @param x Implementation specific argument, {@code null} always means 
that we need a full detached data row.
+     * @param c Closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void invoke(L key, Object x, InvokeClosure<T> c) throws 
IgniteCheckedException;
+
+    /**
      * Returns the value to which the specified key is mapped, or {@code null} 
if this tree contains no mapping for the
      * key.
      *
@@ -70,4 +79,42 @@ public interface IgniteTree<L, T> {
      * @throws IgniteCheckedException If failed.
      */
     public long size() throws IgniteCheckedException;
+
+    /**
+     *
+     */
+    interface InvokeClosure<T> {
+        /**
+         *
+         * @param row Old row or {@code null} if old row not found.
+         * @throws IgniteCheckedException If failed.
+         */
+        void call(@Nullable T row) throws IgniteCheckedException;
+
+        /**
+         * @return New row for {@link OperationType#PUT} operation.
+         */
+        T newRow();
+
+        /**
+         * @return Operation type for this closure or {@code null} if it is 
unknown.
+         *      After method {@link #call(Object)} has been called, operation 
type must
+         *      be know and this method can not return {@code null}.
+         */
+        OperationType operationType();
+    }
+
+    /**
+     *
+     */
+    enum OperationType {
+        /** */
+        NOOP,
+
+        /** */
+        REMOVE,
+
+        /** */
+        PUT
+    }
 }

Reply via email to