This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6774c9fa90 IGNITE-18717 Fix parallel read and delete RowVersion for 
PageMemoryMvPartitionStorage (#1652)
6774c9fa90 is described below

commit 6774c9fa90afbcceb8d03c1f200138afaba0e5da
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Feb 14 17:53:36 2023 +0300

    IGNITE-18717 Fix parallel read and delete RowVersion for 
PageMemoryMvPartitionStorage (#1652)
---
 .../tree/AbstractBplusTreePageMemoryTest.java      | 148 +++++++++-
 .../ignite/internal/pagememory/tree/BplusTree.java | 283 +++++++++++++------
 .../internal/pagememory/tree/IgniteTree.java       |  11 +
 .../ignite/internal/storage/StorageException.java  |  11 +
 .../ignite/internal/storage/util/StorageUtils.java |  10 +
 .../AbstractMvPartitionStorageConcurrencyTest.java |  36 ++-
 .../storage/AbstractMvPartitionStorageTest.java    |   3 +-
 .../storage/BaseMvPartitionStorageTest.java        |   8 +
 .../pagememory/mv/AbortWriteInvokeClosure.java     | 122 ++++++++
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 309 +++++++++------------
 .../mv/AbstractPartitionTimestampCursor.java       |  79 ++----
 .../mv/AddWriteCommittedInvokeClosure.java         | 103 +++++++
 .../pagememory/mv/AddWriteInvokeClosure.java       | 156 +++++++++++
 .../pagememory/mv/CommitWriteInvokeClosure.java    | 101 +++++++
 .../internal/storage/pagememory/mv/LockHolder.java |  71 +++++
 .../storage/pagememory/mv/ScanVersionsCursor.java  |  17 +-
 .../storage/pagememory/mv/LockHolderTest.java      |  83 ++++++
 ...ageMemoryMvPartitionStorageConcurrencyTest.java |  37 ++-
 ...ageMemoryMvPartitionStorageConcurrencyTest.java |  37 ++-
 19 files changed, 1284 insertions(+), 341 deletions(-)

diff --git 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
index 789fd0d2ea..20ec72f1db 100644
--- 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
+++ 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
@@ -34,13 +34,16 @@ import static 
org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.Constants.GiB;
 import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -51,6 +54,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -80,6 +84,7 @@ import 
org.apache.ignite.internal.pagememory.datastructure.DataStructure;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.reuse.ReuseList;
 import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
 import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
 import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
 import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
@@ -2367,6 +2372,147 @@ public abstract class AbstractBplusTreePageMemoryTest 
extends BaseIgniteAbstract
         assertEquals(0L, tree.findNext(-1L, true));
     }
 
+    @Test
+    void testFindOneWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        TreeRowMapClosure<Long, Long, String> treeRowClosure = new 
TreeRowMapClosure<>() {
+            @Override
+            public String map(Long treeRow) {
+                return "row" + treeRow;
+            }
+        };
+
+        assertEquals("row0", tree.findOne(0L, treeRowClosure, null));
+        assertEquals("rownull", tree.findOne(1L, treeRowClosure, null));
+    }
+
+    @Test
+    void testFindWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+        tree.put(1L);
+
+        TreeRowMapClosure<Long, Long, String> treeRowClosure = new 
TreeRowMapClosure<>() {
+            @Override
+            public String map(Long treeRow) {
+                return "row" + treeRow;
+            }
+        };
+
+        Cursor<String> cursor = tree.find(null, null, treeRowClosure, null);
+
+        assertTrue(cursor.hasNext());
+        assertEquals("row0", cursor.next());
+
+        assertTrue(cursor.hasNext());
+        assertEquals("row1", cursor.next());
+
+        assertFalse(cursor.hasNext());
+        assertThrows(NoSuchElementException.class, cursor::next);
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForPut() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        // Checks insert.
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertNull(oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return 0L;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return PUT;
+            }
+
+            @Override
+            public void onUpdate() {
+                future0.complete(null);
+            }
+        });
+
+        assertThat(future0, willCompleteSuccessfully());
+
+        assertEquals(0L, tree.findOne(0L));
+
+        // Checks replace.
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertEquals(0L, oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return 0L;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return PUT;
+            }
+
+            @Override
+            public void onUpdate() {
+                future1.complete(null);
+            }
+        });
+
+        assertThat(future1, willCompleteSuccessfully());
+
+        assertEquals(0L, tree.findOne(0L));
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForRemove() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertEquals(0L, oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return null;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return REMOVE;
+            }
+
+            @Override
+            public void onUpdate() {
+                future.complete(null);
+            }
+        });
+
+        assertThat(future, willCompleteSuccessfully());
+
+        assertNull(tree.findOne(0L));
+    }
+
     private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws 
Exception {
         final TestTree tree = createTestTree(canGetRow);
 
@@ -2850,7 +2996,7 @@ public abstract class AbstractBplusTreePageMemoryTest 
extends BaseIgniteAbstract
     /**
      * {@link TreeRowClosure} implementation for the test.
      */
-    static class TestTreeFindFilteredClosure implements TreeRowClosure<Long, 
Long> {
+    static class TestTreeFindFilteredClosure implements 
TreeRowMapClosure<Long, Long, Long> {
         private final Set<Long> vals;
 
         /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index f0a915f41e..2be9922618 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1178,18 +1178,18 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      *
      * @param upper Upper bound.
      * @param upIncl {@code true} if upper bound is inclusive.
-     * @param c Filter closure.
+     * @param c Tree row closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteInternalCheckedException If failed.
      */
-    private Cursor<T> findLowerUnbounded(
+    private <R> Cursor<R> findLowerUnbounded(
             L upper,
             boolean upIncl,
-            TreeRowClosure<L, T> c,
+            TreeRowMapClosure<L, T, R> c,
             @Nullable Object x
     ) throws IgniteInternalCheckedException {
-        ForwardCursor cursor = new ForwardCursor(upper, upIncl, c, x);
+        ForwardCursor<R> cursor = new ForwardCursor<>(upper, upIncl, c, x);
 
         long firstPageId;
 
@@ -1249,15 +1249,15 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      *
      * @param lower Lower bound inclusive or {@code null} if unbounded.
      * @param upper Upper bound inclusive or {@code null} if unbounded.
-     * @param c Filter closure.
+     * @param c Tree row closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteInternalCheckedException If failed.
      */
-    public Cursor<T> find(
+    public <R> Cursor<R> find(
             @Nullable L lower,
             @Nullable L upper,
-            TreeRowClosure<L, T> c,
+            TreeRowMapClosure<L, T, R> c,
             @Nullable Object x
     ) throws IgniteInternalCheckedException {
         return find(lower, upper, true, true, c, x);
@@ -1270,22 +1270,24 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      * @param upper Upper bound or {@code null} if unbounded.
      * @param lowIncl {@code true} if lower bound is inclusive.
      * @param upIncl {@code true} if upper bound is inclusive.
-     * @param c Filter closure.
+     * @param c Tree row closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
-     * @throws IgniteInternalCheckedException If failed.
+     * @throws CorruptedDataStructureException If the data structure is broken.
+     * @throws CorruptedTreeException If there were {@link RuntimeException} 
or {@link AssertionError}.
+     * @throws IgniteInternalCheckedException If other errors occurred.
      */
-    public Cursor<T> find(
+    public <R> Cursor<R> find(
             @Nullable L lower,
             @Nullable L upper,
             boolean lowIncl,
             boolean upIncl,
-            @Nullable TreeRowClosure<L, T> c,
+            @Nullable TreeRowMapClosure<L, T, R> c,
             @Nullable Object x
     ) throws IgniteInternalCheckedException {
         checkDestroyed();
 
-        ForwardCursor cursor = new ForwardCursor(lower, upper, lowIncl, 
upIncl, c, x);
+        ForwardCursor<R> cursor = new ForwardCursor<>(lower, upper, lowIncl, 
upIncl, c, x);
 
         try {
             if (lower == null) {
@@ -1301,7 +1303,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             throw new IgniteInternalCheckedException("Runtime failure on 
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
         } catch (RuntimeException | AssertionError e) {
             long[] pageIds = pages(
-                    lower == null || cursor == null || cursor.getCursor == 
null,
+                    lower == null || cursor.getCursor == null,
                     () -> new long[]{cursor.getCursor.pageId}
             );
 
@@ -1507,11 +1509,13 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
 
         try {
             if (c == null) {
-                g = new GetOne(null, null, null, true);
+                GetOne<T> getOne = new GetOne<>(null, null, null, true);
+
+                g = getOne;
 
                 doFind(g);
 
-                return (T) g.row;
+                return getOne.res;
             } else {
                 GetLast getLast = new GetLast(c);
 
@@ -1549,18 +1553,25 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      * Returns found result or {@code null}.
      *
      * @param row Lookup row for exact match.
+     * @param c Tree row closure, if the tree row is not found, then {@code 
null} will be passed to the {@link TreeRowMapClosure#map}.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
-     * @throws IgniteInternalCheckedException If failed.
+     * @throws CorruptedDataStructureException If the data structure is broken.
+     * @throws CorruptedTreeException If there were {@link RuntimeException} 
or {@link AssertionError}.
+     * @throws IgniteInternalCheckedException If other errors occurred.
      */
-    public final <R> @Nullable R findOne(L row, @Nullable TreeRowClosure<L, T> 
c, Object x) throws IgniteInternalCheckedException {
+    public final <R> @Nullable R findOne(
+            L row,
+            @Nullable TreeRowMapClosure<L, T, R> c,
+            @Nullable Object x
+    ) throws IgniteInternalCheckedException {
         checkDestroyed();
 
-        GetOne g = new GetOne(row, c, x, false);
+        GetOne<R> g = new GetOne<>(row, c, x, false);
 
         try {
             doFind(g);
 
-            return (R) g.row;
+            return g.res;
         } catch (CorruptedDataStructureException e) {
             throw e;
         } catch (IgniteInternalCheckedException e) {
@@ -3353,27 +3364,34 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
     /**
      * Get a single entry.
      */
-    private final class GetOne extends Get {
-        Object arg;
+    private final class GetOne<R> extends Get {
+        private final @Nullable Object arg;
 
-        @Nullable TreeRowClosure<L, T> filter;
+        private final @Nullable TreeRowMapClosure<L, T, R> treeRowClosure;
+
+        private @Nullable R res;
 
         /**
          * Constructor.
          *
          * @param row Row.
-         * @param filter Closure filter.
+         * @param treeRowClosure Tree row closure, if the tree row is not 
found, then {@code null} will be passed to the
+         *      {@link TreeRowMapClosure#map}.
          * @param arg Implementation specific argument.
          * @param findLast Ignore row passed, find last row
          */
-        private GetOne(L row, @Nullable TreeRowClosure<L, T> filter, Object 
arg, boolean findLast) {
+        private GetOne(
+                @Nullable L row,
+                @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
+                @Nullable Object arg,
+                boolean findLast
+        ) {
             super(row, findLast);
 
+            this.treeRowClosure = treeRowClosure;
             this.arg = arg;
-            this.filter = filter;
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean found(BplusIo<L> io, long pageAddr, int idx, int lvl) throws 
IgniteInternalCheckedException {
             // Check if we are on an inner page and can't get row from it.
@@ -3381,10 +3399,27 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
                 return false;
             }
 
-            row = filter == null || filter.apply(BplusTree.this, io, pageAddr, 
idx) ? getRow(io, pageAddr, idx, arg) : null;
+            if (treeRowClosure == null || treeRowClosure.apply(BplusTree.this, 
io, pageAddr, idx)) {
+                T treeRow = getRow(io, pageAddr, idx, arg);
+
+                res = treeRowClosure != null ? treeRowClosure.map(treeRow) : 
(R) treeRow;
+            }
 
             return true;
         }
+
+        @Override
+        boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
+            assert lvl >= 0 : lvl;
+
+            if (lvl == 0) {
+                res = treeRowClosure == null ? null : treeRowClosure.map(null);
+
+                return true;
+            }
+
+            return false;
+        }
     }
 
     /**
@@ -3812,12 +3847,22 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
          * @param needOld {@code True} If need return old value.
          */
         private Put(T row, boolean needOld) {
-            super(row);
+            this(row, needOld, null);
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param row Row.
+         * @param needOld {@code True} If need return old value.
+         * @param onUpdateCallback Callback after performing an update of tree 
row while on a page with that tree row under its write lock.
+         */
+        private Put(T row, boolean needOld, @Nullable Runnable 
onUpdateCallback) {
+            super(row, onUpdateCallback);
 
             this.needOld = needOld;
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
             assert btmLvl >= 0 : btmLvl;
@@ -3826,7 +3871,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return lvl == btmLvl;
         }
 
-        /** {@inheritDoc} */
         @Override
         protected Result finishOrLockTail(long pageId, long page, long backId, 
long fwdId, int lvl)
                 throws IgniteInternalCheckedException {
@@ -3854,7 +3898,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return res;
         }
 
-        /** {@inheritDoc} */
         @Override
         protected Result finishTail() throws IgniteInternalCheckedException {
             // An inner node is required for replacement.
@@ -3932,7 +3975,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             releaseTail();
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean isFinished() {
             return row == null;
@@ -3965,6 +4007,10 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
 
         private void insertSimple(long pageAddr, BplusIo<L> io, int idx) 
throws IgniteInternalCheckedException {
             io.insert(pageAddr, idx, row, null, rightId, false);
+
+            if (onUpdateCallback != null) {
+                onUpdateCallback.run();
+            }
         }
 
         /**
@@ -4100,7 +4146,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
          * @return Result.
          * @throws IgniteInternalCheckedException If failed.
          */
-        public Result tryReplace(long pageId, long page, long fwdId, int lvl) 
throws IgniteInternalCheckedException {
+        private Result tryReplace(long pageId, long page, long fwdId, int lvl) 
throws IgniteInternalCheckedException {
             // Init args.
             this.pageId = pageId;
             this.fwdId = fwdId;
@@ -4115,11 +4161,14 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
          * @param pageAddr Page address.
          * @param idx Replacement index.
          */
-        public void replaceRowInPage(BplusIo<L> io, long pageAddr, int idx) 
throws IgniteInternalCheckedException {
+        private void replaceRowInPage(BplusIo<L> io, long pageAddr, int idx) 
throws IgniteInternalCheckedException {
             io.store(pageAddr, idx, row, null, false);
+
+            if (onUpdateCallback != null) {
+                onUpdateCallback.run();
+            }
         }
 
-        /** {@inheritDoc} */
         @Override
         void checkLockRetry() throws IgniteInternalCheckedException {
             // Non-null tail means that lock on the tail page is still being 
held, and we can't fail with exception.
@@ -4254,14 +4303,14 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
 
                     assert newRow != null;
 
-                    op = new Put(newRow, false);
+                    op = new Put(newRow, false, clo::onUpdate);
 
                     break;
 
                 case REMOVE:
                     assert foundRow != null;
 
-                    op = new Remove(row, false);
+                    op = new Remove(row, false, clo::onUpdate);
 
                     break;
 
@@ -4423,13 +4472,32 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         /** We may need to lock part of the tree branch from the bottom to up 
for multiple levels. */
         Tail<L> tail;
 
+        /**
+         * Callback after performing an {@link Put put} or {@link Remove 
remove} of a tree row while on a page with that tree row under its
+         * write lock.
+         */
+        final @Nullable Runnable onUpdateCallback;
+
         /**
          * Constructor.
          *
          * @param row Row.
          */
         private Update(L row) {
+            this(row, null);
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param row Row.
+         * @param onUpdateCallback Callback after performing an {@link Put 
put} or {@link Remove remove} of a tree row while on a page with
+         *      that tree row under its write lock.
+         */
+        private Update(L row, @Nullable Runnable onUpdateCallback) {
             super(row, false);
+
+            this.onUpdateCallback = onUpdateCallback;
         }
 
         /**
@@ -4504,7 +4572,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             }
         }
 
-        /** {@inheritDoc} */
         @Override
         public final boolean canRelease(long pageId, int lvl) {
             return pageId != 0L && !isTail(pageId, lvl);
@@ -4681,12 +4748,22 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
          * @param needOld {@code True} If need return old value.
          */
         private Remove(L row, boolean needOld) {
-            super(row);
+            this(row, needOld, null);
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param row Row.
+         * @param needOld {@code True} If need return old value.
+         * @param onRemoveCallback Callback after performing an remove of tree 
row while on a page with that tree row under its write lock.
+         */
+        private Remove(L row, boolean needOld, @Nullable Runnable 
onRemoveCallback) {
+            super(row, onRemoveCallback);
 
             this.needOld = needOld;
         }
 
-        /** {@inheritDoc} */
         @Override
         public long pollFreePage() {
             if (freePages == null) {
@@ -4704,7 +4781,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return res;
         }
 
-        /** {@inheritDoc} */
         @Override
         public void addFreePage(long pageId) {
             assert pageId != 0L;
@@ -4728,7 +4804,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             }
         }
 
-        /** {@inheritDoc} */
         @Override
         public boolean isEmpty() {
             if (freePages == null) {
@@ -4740,7 +4815,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return false;
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
             if (lvl == 0) {
@@ -4893,7 +4967,6 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return false;
         }
 
-        /** {@inheritDoc} */
         @Override
         protected Result finishTail() throws IgniteInternalCheckedException {
             assert !isFinished();
@@ -5148,6 +5221,10 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             doRemove(pageAddr, io, cnt, idx);
 
             assert isRemoved();
+
+            if (onUpdateCallback != null) {
+                onUpdateCallback.run();
+            }
         }
 
         /**
@@ -5745,7 +5822,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      * @return Data row.
      * @throws IgniteInternalCheckedException If failed.
      */
-    public abstract T getRow(BplusIo<L> io, long pageAddr, int idx, Object x) 
throws IgniteInternalCheckedException;
+    public abstract T getRow(BplusIo<L> io, long pageAddr, int idx, @Nullable 
Object x) throws IgniteInternalCheckedException;
 
     /**
      * Abstract forward cursor.
@@ -6117,35 +6194,38 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
     /**
      * Forward cursor.
      */
-    private final class ForwardCursor extends AbstractForwardCursor implements 
Cursor<T> {
+    private final class ForwardCursor<R> extends AbstractForwardCursor 
implements Cursor<R> {
         /** Implementation specific argument. */
-        @Nullable
-        final Object arg;
+        private final @Nullable Object arg;
 
-        /** Rows. */
-        @Nullable
-        private T[] rows = (T[]) OBJECT_EMPTY_ARRAY;
+        /** {@code null} array means the end of iteration over the cursor. */
+        private @Nullable R @Nullable [] results = (R[]) OBJECT_EMPTY_ARRAY;
+
+        private @Nullable T lastRow;
 
         /** Row index. */
         private int row = -1;
 
         /** Filter closure. */
-        @Nullable
-        private final TreeRowClosure<L, T> filter;
+        private final @Nullable TreeRowMapClosure<L, T, R> treeRowClosure;
 
-        @Nullable
-        private Boolean hasNext = null;
+        private @Nullable Boolean hasNext = null;
 
         /**
          * Lower unbound cursor.
          *
          * @param upperBound Upper bound.
          * @param upIncl {@code true} if upper bound is inclusive.
-         * @param filter Filter closure.
+         * @param treeRowClosure Tree row closure.
          * @param arg Implementation specific argument, {@code null} always 
means that we need to return full detached data row.
          */
-        ForwardCursor(@Nullable L upperBound, boolean upIncl, @Nullable 
TreeRowClosure<L, T> filter, @Nullable Object arg) {
-            this(null, upperBound, true, upIncl, filter, arg);
+        ForwardCursor(
+                @Nullable L upperBound,
+                boolean upIncl,
+                @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
+                @Nullable Object arg
+        ) {
+            this(null, upperBound, true, upIncl, treeRowClosure, arg);
         }
 
         /**
@@ -6155,7 +6235,7 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
          * @param upperBound Upper bound.
          * @param lowIncl {@code true} if lower bound is inclusive.
          * @param upIncl {@code true} if upper bound is inclusive.
-         * @param filter Filter closure.
+         * @param treeRowClosure Tree row closure.
          * @param arg Implementation specific argument, {@code null} always 
means that we need to return full detached data row.
          */
         ForwardCursor(
@@ -6163,16 +6243,15 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
                 @Nullable L upperBound,
                 boolean lowIncl,
                 boolean upIncl,
-                @Nullable TreeRowClosure<L, T> filter,
+                @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
                 @Nullable Object arg
         ) {
             super(lowerBound, upperBound, lowIncl, upIncl);
 
-            this.filter = filter;
+            this.treeRowClosure = treeRowClosure;
             this.arg = arg;
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean fillFromBuffer0(long pageAddr, BplusIo<L> io, int startIdx, 
int cnt) throws IgniteInternalCheckedException {
             if (startIdx == -1) {
@@ -6193,30 +6272,35 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
                 return false;
             }
 
-            if (rows == OBJECT_EMPTY_ARRAY) {
-                rows = (T[]) new Object[cnt0];
+            if (results == OBJECT_EMPTY_ARRAY) {
+                results = (R[]) new Object[cnt0];
             }
 
             int resCnt = 0;
 
             for (int idx = startIdx; idx < cnt; idx++) {
-                if (filter == null || filter.apply(BplusTree.this, io, 
pageAddr, idx)) {
-                    rows = set(rows, resCnt++, getRow(io, pageAddr, idx, arg));
+                if (treeRowClosure == null || 
treeRowClosure.apply(BplusTree.this, io, pageAddr, idx)) {
+                    T treeRow = getRow(io, pageAddr, idx, arg);
+
+                    R result = treeRowClosure != null ? 
treeRowClosure.map(treeRow) : (R) treeRow;
+
+                    results = set(results, resCnt++, result);
+
+                    lastRow = treeRow;
                 }
             }
 
             if (resCnt == 0) {
-                rows = (T[]) OBJECT_EMPTY_ARRAY;
+                results = (R[]) OBJECT_EMPTY_ARRAY;
 
                 return false;
             }
 
-            clearTail(rows, resCnt);
+            clearTail(results, resCnt);
 
             return true;
         }
 
-        /** {@inheritDoc} */
         @Override
         boolean reinitialize0() {
             hasNext = null;
@@ -6224,31 +6308,28 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
             return hasNext();
         }
 
-        /** {@inheritDoc} */
         @Override
         void onNotFound(boolean readDone) {
             if (readDone) {
-                rows = null;
+                results = null;
             } else {
-                if (rows != OBJECT_EMPTY_ARRAY) {
-                    assert rows.length > 0; // Otherwise it makes no sense to 
create an array.
+                if (results != OBJECT_EMPTY_ARRAY) {
+                    assert results.length > 0; // Otherwise it makes no sense 
to create an array.
 
                     // Fake clear.
-                    rows[0] = null;
+                    results[0] = null;
                 }
             }
         }
 
-        /** {@inheritDoc} */
         @Override
         void init0() {
             row = -1;
         }
 
-        /** {@inheritDoc} */
         @Override
         public boolean hasNext() {
-            if (rows == null) {
+            if (results == null) {
                 return false;
             }
 
@@ -6262,30 +6343,25 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         /**
          * Returns cleared last row.
          */
-        private @Nullable T clearLastRow() {
+        private void clearLastResult() {
             if (row == 0) {
-                return null;
+                return;
             }
 
             int last = row - 1;
 
-            T r = rows[last];
-
-            assert r != null;
-
-            rows[last] = null;
+            assert results[last] != null;
 
-            return r;
+            results[last] = null;
         }
 
-        /** {@inheritDoc} */
         @Override
-        public T next() {
+        public R next() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
 
-            T r = rows[row];
+            R r = results[row];
 
             assert r != null;
 
@@ -6295,16 +6371,20 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         }
 
         private boolean advance() {
-            if (++row < rows.length && rows[row] != null) {
-                clearLastRow(); // Allow to GC the last returned row.
+            if (++row < results.length && results[row] != null) {
+                clearLastResult(); // Allow to GC the last returned row.
 
                 return true;
             }
 
-            T lastRow = clearLastRow();
+            clearLastResult();
 
             row = 0;
 
+            T lastRow = this.lastRow;
+
+            this.lastRow = null;
+
             try {
                 return nextPage(lastRow);
             } catch (IgniteInternalCheckedException e) {
@@ -6499,6 +6579,27 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         boolean apply(BplusTree<L, T> tree, BplusIo<L> io, long pageAddr, int 
idx) throws IgniteInternalCheckedException;
     }
 
+    /**
+     * Extension of the {@link TreeRowClosure} with the ability to {@link 
#map(Object) convert} tree row to some object.
+     */
+    public interface TreeRowMapClosure<L, T extends L, R> extends 
TreeRowClosure<L, T> {
+        @Override
+        default boolean apply(BplusTree<L, T> tree, BplusIo<L> io, long 
pageAddr, int idx) throws IgniteInternalCheckedException {
+            return true;
+        }
+
+        /**
+         * Converts a tree row to some object.
+         *
+         * <p>Executed after {@link #apply} has returned {@code true}, and 
also under read lock of page on which the tree row is located.
+         *
+         * @param treeRow Tree row.
+         */
+        default R map(T treeRow) {
+            return (R) treeRow;
+        }
+    }
+
     /**
      * A generic visitor-style interface for performing 
inspection/modification operations on the tree.
      */
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
index b182029ad8..0bcc8b04b6 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
@@ -126,6 +126,17 @@ public interface IgniteTree<L, T> {
          * Returns operation type for this closure.
          */
         OperationType operationType();
+
+        /**
+         * Callback after inserting/replacing/deleting a tree row.
+         *
+         * <p>It is performed under the same write lock of page on which the 
tree row is located.
+         *
+         * <p>What can allow us to ensure the atomicity of changes in the tree 
row and the data associated with it.
+         */
+        default void onUpdate() {
+            // No-op.
+        }
     }
 
     /**
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index 0b2e6d2d3c..58ddbcecfe 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -88,4 +88,15 @@ public class StorageException extends 
IgniteInternalException {
     public StorageException(String messagePattern, Throwable cause, Object... 
params) {
         this(IgniteStringFormatter.format(messagePattern, params), cause);
     }
+
+    /**
+     * Constructor.
+     *
+     * @param messagePattern Error message pattern.
+     * @param params Error message params.
+     * @see IgniteStringFormatter#format(String, Object...)
+     */
+    public StorageException(String messagePattern, Object... params) {
+        this(IgniteStringFormatter.format(messagePattern, params));
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
index cf67fbf12a..e1aa4be770 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
@@ -21,6 +21,7 @@ import java.util.function.Supplier;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 
 /**
@@ -145,6 +146,15 @@ public class StorageUtils {
         }
     }
 
+    /**
+     * Throws a {@link StorageException} if it is the cause.
+     */
+    public static void 
throwStorageExceptionIfItCause(IgniteInternalCheckedException e) {
+        if (e.getCause() instanceof StorageException) {
+            throw ((StorageException) e.getCause());
+        }
+    }
+
     private static String createStorageInProcessOfRebalanceErrorMessage(String 
storageInfo) {
         return IgniteStringFormatter.format("Storage in the process of 
rebalancing: [{}]", storageInfo);
     }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 67692d7c08..f96e3d42ef 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -43,7 +43,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             runRace(
                     () -> abortWrite(ROW_ID),
                     () -> read(ROW_ID, clock.now()),
-                    () -> scanFirstEntry(clock.now())
+                    () -> scanFirstEntry(clock.now()),
+                    () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+                    () -> scanFirstVersion(ROW_ID)
             );
 
             assertNull(read(ROW_ID, clock.now()));
@@ -58,7 +60,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             runRace(
                     () -> commitWrite(ROW_ID, clock.now()),
                     () -> read(ROW_ID, clock.now()),
-                    () -> scanFirstEntry(clock.now())
+                    () -> scanFirstEntry(clock.now()),
+                    () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+                    () -> scanFirstVersion(ROW_ID)
             );
 
             assertRowMatches(read(ROW_ID, clock.now()), TABLE_ROW);
@@ -73,7 +77,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             runRace(
                     () -> addWrite(ROW_ID, TABLE_ROW2, TX_ID),
                     () -> read(ROW_ID, clock.now()),
-                    () -> scanFirstEntry(clock.now())
+                    () -> scanFirstEntry(clock.now()),
+                    () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+                    () -> scanFirstVersion(ROW_ID)
             );
 
             assertRowMatches(read(ROW_ID, clock.now()), TABLE_ROW2);
@@ -82,7 +88,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testRegularGcAndRead(AddAndCommit addAndCommit) {
+    public void testRegularGcAndRead(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
 
@@ -102,7 +108,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+    public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit.perform(this, 
TABLE_ROW);
 
@@ -120,7 +126,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+    public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -141,7 +147,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+    public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -164,7 +170,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+    public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -183,7 +189,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
 
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
-    void testConcurrentGc(AddAndCommit addAndCommit) {
+    public void testConcurrentGc(AddAndCommit addAndCommit) {
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -207,6 +213,13 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
         }
     }
 
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private void scanFirstVersion(RowId rowId) {
+        try (var cursor = scan(rowId)) {
+            cursor.hasNext();
+        }
+    }
+
     /**
      * Adds a tombstone and cleans a GC queue until nothing's there.
      */
@@ -220,7 +233,10 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
         } while (row != null);
     }
 
-    private enum AddAndCommit {
+    /**
+     * Performing add write.
+     */
+    protected enum AddAndCommit {
         ATOMIC {
             @Override
             HybridTimestamp perform(AbstractMvPartitionStorageConcurrencyTest 
test, @Nullable BinaryRow binaryRow) {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 85dc0ac67d..4fe8d0b286 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -1166,7 +1167,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         RowId rowId = insert(binaryRow, txId);
 
         StorageException ex = assertThrows(StorageException.class, () -> 
addWriteCommitted(rowId, binaryRow2, clock.now()));
-        assertThat(ex.getMessage(), containsString("Write intent exists for " 
+ rowId));
+        assertThat(ex.getMessage(), allOf(containsString("Write intent 
exists"), containsString(rowId.toString())));
     }
 
     @Test
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index 9573d496c3..7724b0c2d5 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -111,6 +112,13 @@ public abstract class BaseMvPartitionStorageTest extends 
BaseMvStoragesTest {
         return storage.scan(timestamp);
     }
 
+    /**
+     * Scans versions.
+     */
+    protected Cursor<ReadResult> scan(RowId rowId) {
+        return storage.scanVersions(rowId);
+    }
+
     /**
      * Inserts a row inside of consistency closure.
      */
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
new file mode 100644
index 0000000000..ad176c65d6
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#abortWrite(RowId)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AbortWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private OperationType operationType;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable RowVersion toRemove;
+
+    private @Nullable BinaryRow previousUncommittedRowVersion;
+
+    AbortWriteInvokeClosure(RowId rowId, AbstractPageMemoryMvPartitionStorage 
storage) {
+        this.rowId = rowId;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null || oldRow.transactionId() == null) {
+            // Row doesn't exist or the chain doesn't contain an uncommitted 
write intent.
+            operationType = OperationType.NOOP;
+
+            return;
+        }
+
+        RowVersion latestVersion = storage.readRowVersion(oldRow.headLink(), 
ALWAYS_LOAD_VALUE);
+
+        assert latestVersion.isUncommitted();
+
+        toRemove = latestVersion;
+
+        if (latestVersion.hasNextLink()) {
+            // Next can be safely replaced with any value (like 0), because 
this field is only used when there
+            // is some uncommitted value, but when we add an uncommitted 
value, we 'fix' such placeholder value
+            // (like 0) by replacing it with a valid value.
+            newRow = VersionChain.createCommitted(rowId, 
latestVersion.nextLink(), NULL_LINK);
+
+            operationType = OperationType.PUT;
+        } else {
+            // It was the only version, let's remove the chain as well.
+            operationType = OperationType.REMOVE;
+        }
+
+        previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(latestVersion);
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert operationType == OperationType.PUT ? newRow != null : newRow == 
null : "newRow=" + newRow + ", op=" + operationType;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        assert operationType != null;
+
+        return operationType;
+    }
+
+    /**
+     * Returns the result for {@link MvPartitionStorage#abortWrite(RowId)}.
+     */
+    @Nullable BinaryRow getPreviousUncommittedRowVersion() {
+        return previousUncommittedRowVersion;
+    }
+
+    /**
+     * Method to call after {@link BplusTree#invoke(Object, Object, 
InvokeClosure)} has completed.
+     */
+    void afterCompletion() {
+        assert operationType == OperationType.NOOP ? toRemove == null : 
toRemove != null : "toRemove=" + toRemove + ", op=" + operationType;
+
+        if (toRemove != null) {
+            storage.removeRowVersion(toRemove);
+        }
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 40a43d7544..2a68d07060 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -18,12 +18,11 @@
 package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwStorageExceptionIfItCause;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -31,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.configuration.NamedListView;
@@ -39,6 +40,8 @@ import org.apache.ignite.internal.pagememory.PageIdAllocator;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -68,6 +71,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTre
 import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.storage.util.StorageUtils;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -76,6 +80,16 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract implementation of partition storage using Page Memory.
+ *
+ * <p>A few words about parallel operations with version chains:
+ * <ul>
+ *     <li>All update operations (including creation) must first be 
synchronized by row ID using
+ *     {@link #inUpdateVersionChainLock(RowId, Supplier)};</li>
+ *     <li>Reads and updates of version chains (or a single version) must be 
synchronized by the {@link #versionChainTree}, for example for
+ *     reading you can use {@link #findVersionChain(RowId, Function)} or
+ *     {@link 
AbstractPartitionTimestampCursor#createVersionChainCursorIfMissing()}, and for 
updates you can use {@link InvokeClosure}
+ *     for example {@link AddWriteInvokeClosure} or {@link 
CommitWriteInvokeClosure}.</li>
+ * </ul>
  */
 public abstract class AbstractPageMemoryMvPartitionStorage implements 
MvPartitionStorage {
     private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
@@ -108,6 +122,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     /** Current state of the storage. */
     protected final AtomicReference<StorageState> state = new 
AtomicReference<>(StorageState.RUNNABLE);
 
+    /** Version chain update lock by row ID. */
+    private final ConcurrentMap<RowId, LockHolder<ReentrantLock>> 
updateVersionChainLockByRowId = new ConcurrentHashMap<>();
+
     /**
      * Constructor.
      *
@@ -316,17 +333,17 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
                         String.format("RowId partition [%d] is not equal to 
storage partition [%d].", rowId.partitionId(), partitionId));
             }
 
-            VersionChain versionChain = findVersionChain(rowId);
-
-            if (versionChain == null) {
-                return ReadResult.empty(rowId);
-            }
+            return findVersionChain(rowId, versionChain -> {
+                if (versionChain == null) {
+                    return ReadResult.empty(rowId);
+                }
 
-            if (lookingForLatestVersion(timestamp)) {
-                return findLatestRowVersion(versionChain);
-            } else {
-                return findRowVersionByTimestamp(versionChain, timestamp);
-            }
+                if (lookingForLatestVersion(timestamp)) {
+                    return findLatestRowVersion(versionChain);
+                } else {
+                    return findRowVersionByTimestamp(versionChain, timestamp);
+                }
+            });
         });
     }
 
@@ -334,14 +351,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         return timestamp == HybridTimestamp.MAX_VALUE;
     }
 
-    private @Nullable VersionChain findVersionChain(RowId rowId) {
-        try {
-            return versionChainTree.findOne(new VersionChainKey(rowId));
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Version chain lookup failed", e);
-        }
-    }
-
     ReadResult findLatestRowVersion(VersionChain versionChain) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), 
ALWAYS_LOAD_VALUE);
 
@@ -375,15 +384,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         return read.result();
     }
 
-    private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, 
UUID txId) {
-        assert versionChain.isUncommitted();
-
-        if (!txId.equals(versionChain.transactionId())) {
-            throw new TxIdMismatchException(txId, 
versionChain.transactionId());
-        }
-    }
-
-    private @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
+    @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
         if (rowVersion.isTombstone()) {
             return null;
         }
@@ -497,25 +498,15 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         );
     }
 
-    private RowVersion insertRowVersion(@Nullable BinaryRow row, long 
nextPartitionlessLink) {
-        byte[] rowBytes = rowBytes(row);
-
-        RowVersion rowVersion = new RowVersion(partitionId, 
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
-
-        insertRowVersion(rowVersion);
-
-        return rowVersion;
-    }
-
-    private void insertRowVersion(RowVersion rowVersion) {
+    void insertRowVersion(RowVersion rowVersion) {
         try {
             rowVersionFreeList.insertDataRow(rowVersion);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot store a row version", e);
+            throw new StorageException("Cannot store a row version: [row={}, 
{}]", e, rowVersion, createStorageInfo());
         }
     }
 
-    private static byte[] rowBytes(@Nullable BinaryRow row) {
+    static byte[] rowBytes(@Nullable BinaryRow row) {
         // TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
         return row == null ? TOMBSTONE_PAYLOAD : row.bytes();
     }
@@ -528,42 +519,25 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), 
this::createStorageInfo);
 
-            VersionChain currentChain = findVersionChain(rowId);
+            return inUpdateVersionChainLock(rowId, () -> {
+                try {
+                    AddWriteInvokeClosure addWrite = new 
AddWriteInvokeClosure(rowId, row, txId, commitTableId, commitPartitionId, this);
 
-            if (currentChain == null) {
-                RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+                    versionChainTree.invoke(new VersionChainKey(rowId), null, 
addWrite);
 
-                VersionChain versionChain = 
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, 
newVersion.link(),
-                        NULL_LINK);
+                    addWrite.afterCompletion();
 
-                updateVersionChain(versionChain);
+                    return addWrite.getPreviousUncommittedRowVersion();
+                } catch (IgniteInternalCheckedException e) {
+                    throwStorageExceptionIfItCause(e);
 
-                return null;
-            }
-
-            if (currentChain.isUncommitted()) {
-                throwIfChainBelongsToAnotherTx(currentChain, txId);
-            }
-
-            RowVersion newVersion = insertRowVersion(row, 
currentChain.newestCommittedLink());
-
-            BinaryRow res = null;
-
-            if (currentChain.isUncommitted()) {
-                RowVersion currentVersion = 
readRowVersion(currentChain.headLink(), ALWAYS_LOAD_VALUE);
-
-                res = rowVersionToBinaryRow(currentVersion);
-
-                // as we replace an uncommitted version with new one, we need 
to remove old uncommitted version
-                removeRowVersion(currentVersion);
-            }
-
-            VersionChain chainReplacement = 
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, 
newVersion.link(),
-                    newVersion.nextLink());
-
-            updateVersionChain(chainReplacement);
+                    if (e.getCause() instanceof TxIdMismatchException) {
+                        throw (TxIdMismatchException) e.getCause();
+                    }
 
-            return res;
+                    throw new StorageException("Error while executing 
addWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+                }
+            });
         });
     }
 
@@ -574,43 +548,24 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            VersionChain currentVersionChain = findVersionChain(rowId);
-
-            if (currentVersionChain == null || 
currentVersionChain.transactionId() == null) {
-                // Row doesn't exist or the chain doesn't contain an 
uncommitted write intent.
-                return null;
-            }
-
-            RowVersion latestVersion = 
readRowVersion(currentVersionChain.headLink(), ALWAYS_LOAD_VALUE);
+            return inUpdateVersionChainLock(rowId, () -> {
+                try {
+                    AbortWriteInvokeClosure abortWrite = new 
AbortWriteInvokeClosure(rowId, this);
 
-            assert latestVersion.isUncommitted();
+                    versionChainTree.invoke(new VersionChainKey(rowId), null, 
abortWrite);
 
-            removeRowVersion(latestVersion);
+                    abortWrite.afterCompletion();
 
-            if (latestVersion.hasNextLink()) {
-                // Next can be safely replaced with any value (like 0), 
because this field is only used when there
-                // is some uncommitted value, but when we add an uncommitted 
value, we 'fix' such placeholder value
-                // (like 0) by replacing it with a valid value.
-                VersionChain versionChainReplacement = 
VersionChain.createCommitted(rowId, latestVersion.nextLink(), NULL_LINK);
+                    return abortWrite.getPreviousUncommittedRowVersion();
+                } catch (IgniteInternalCheckedException e) {
+                    throwStorageExceptionIfItCause(e);
 
-                updateVersionChain(versionChainReplacement);
-            } else {
-                // it was the only version, let's remove the chain as well
-                removeVersionChain(currentVersionChain);
-            }
-
-            return rowVersionToBinaryRow(latestVersion);
+                    throw new StorageException("Error while executing 
abortWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+                }
+            });
         });
     }
 
-    private void removeVersionChain(VersionChain currentVersionChain) {
-        try {
-            versionChainTree.remove(currentVersionChain);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot remove chain version", e);
-        }
-    }
-
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         assert rowId.partitionId() == partitionId : rowId;
@@ -618,50 +573,25 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         busy(() -> {
             throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), 
this::createStorageInfo);
 
-            VersionChain currentVersionChain = findVersionChain(rowId);
-
-            if (currentVersionChain == null || 
currentVersionChain.transactionId() == null) {
-                // Row doesn't exist or the chain doesn't contain an 
uncommitted write intent.
-                return null;
-            }
+            return inUpdateVersionChainLock(rowId, () -> {
+                try {
+                    versionChainTree.invoke(new VersionChainKey(rowId), null, 
new CommitWriteInvokeClosure(timestamp, this));
 
-            long chainLink = currentVersionChain.headLink();
+                    return null;
+                } catch (IgniteInternalCheckedException e) {
+                    throwStorageExceptionIfItCause(e);
 
-            try {
-                rowVersionFreeList.updateTimestamp(chainLink, timestamp);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Cannot update timestamp", e);
-            }
-
-            try {
-                VersionChain updatedVersionChain = 
VersionChain.createCommitted(
-                        currentVersionChain.rowId(),
-                        currentVersionChain.headLink(),
-                        currentVersionChain.nextLink()
-                );
-
-                versionChainTree.putx(updatedVersionChain);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Cannot update transaction ID", e);
-            }
-
-            return null;
+                    throw new StorageException("Error while executing 
commitWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+                }
+            });
         });
     }
 
-    private void removeRowVersion(RowVersion currentVersion) {
+    void removeRowVersion(RowVersion rowVersion) {
         try {
-            rowVersionFreeList.removeDataRowByLink(currentVersion.link());
+            rowVersionFreeList.removeDataRowByLink(rowVersion.link());
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot update row version", e);
-        }
-    }
-
-    private void updateVersionChain(VersionChain newVersionChain) {
-        try {
-            versionChainTree.putx(newVersionChain);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot update version chain", e);
+            throw new StorageException("Cannot remove row version: [row={}, 
{}]", e, rowVersion, createStorageInfo());
         }
     }
 
@@ -672,42 +602,36 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         busy(() -> {
             throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), 
this::createStorageInfo);
 
-            VersionChain currentChain = findVersionChain(rowId);
-
-            if (currentChain != null && currentChain.isUncommitted()) {
-                // This means that there is a bug in our code as the caller 
must make sure that no write intent exists
-                // below this write.
-                throw new StorageException("Write intent exists for " + rowId);
-            }
+            return inUpdateVersionChainLock(rowId, () -> {
+                try {
+                    versionChainTree.invoke(
+                            new VersionChainKey(rowId),
+                            null,
+                            new AddWriteCommittedInvokeClosure(rowId, row, 
commitTimestamp, this)
+                    );
 
-            long nextLink = currentChain == null ? NULL_LINK : 
currentChain.newestCommittedLink();
-            RowVersion newVersion = insertCommittedRowVersion(row, 
commitTimestamp, nextLink);
+                    return null;
+                } catch (IgniteInternalCheckedException e) {
+                    throwStorageExceptionIfItCause(e);
 
-            VersionChain chainReplacement = 
VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink());
-
-            updateVersionChain(chainReplacement);
-
-            return null;
+                    throw new StorageException("Error while executing 
addWriteCommitted: [rowId={}, {}]", e, rowId, createStorageInfo());
+                }
+            });
         });
     }
 
-    private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row, 
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
-        byte[] rowBytes = rowBytes(row);
-
-        RowVersion rowVersion = new RowVersion(partitionId, commitTimestamp, 
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
-
-        insertRowVersion(rowVersion);
-
-        return rowVersion;
-    }
-
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            // TODO: IGNITE-18717 Add lock by rowId
-            return new ScanVersionsCursor(rowId, this);
+            return findVersionChain(rowId, versionChain -> {
+                if (versionChain == null) {
+                    return CursorUtils.emptyCursor();
+                }
+
+                return new ScanVersionsCursor(versionChain, this);
+            });
         });
     }
 
@@ -944,15 +868,58 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    @Nullable VersionChain readVersionChain(RowId rowId) {
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
+
+    /**
+     * Searches version chain by row ID and converts the found version chain 
to the result if found.
+     *
+     * @param rowId Row ID.
+     * @param mapper Function for converting the version chain to a result, 
function is executed under the read lock of the page on which
+     *      the version chain is located. If the version chain is not found, 
then {@code null} will be passed to the function.
+     */
+    <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T> 
mapper) {
         try {
-            return versionChainTree.findOne(new VersionChainKey(rowId));
+            return versionChainTree.findOne(new VersionChainKey(rowId), new 
TreeRowMapClosure<>() {
+                @Override
+                public T map(VersionChain treeRow) {
+                    return mapper.apply(treeRow);
+                }
+            }, null);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error getting version chain: 
[rowId={}, {}]", e, rowId, createStorageInfo());
+            throwStorageExceptionIfItCause(e);
+
+            throw new StorageException("Row version lookup failed: [rowId={}, 
{}]", e, rowId, createStorageInfo());
         }
     }
 
-    void throwExceptionIfStorageNotInRunnableState() {
-        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    /**
+     * Organizes external synchronization of update operations for the same 
version chain.
+     */
+    protected <T> T inUpdateVersionChainLock(RowId rowId, Supplier<T> 
supplier) {
+        LockHolder<ReentrantLock> lockHolder = 
updateVersionChainLockByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) 
-> {
+            if (reentrantLockLockHolder == null) {
+                reentrantLockLockHolder = new LockHolder<>(new 
ReentrantLock());
+            }
+
+            reentrantLockLockHolder.incrementHolders();
+
+            return reentrantLockLockHolder;
+        });
+
+        lockHolder.getLock().lock();
+
+        try {
+            return supplier.get();
+        } finally {
+            lockHolder.getLock().unlock();
+
+            updateVersionChainLockByRowId.compute(rowId, (rowId1, 
reentrantLockLockHolder) -> {
+                assert reentrantLockLockHolder != null;
+
+                return reentrantLockLockHolder.decrementHolders() ? null : 
reentrantLockLockHolder;
+            });
+        }
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
index ce49626356..1968a394a6 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import java.util.HashMap;
-import java.util.Map;
+import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwStorageExceptionIfItCause;
+
 import java.util.NoSuchElementException;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -35,16 +34,13 @@ import org.jetbrains.annotations.Nullable;
 abstract class AbstractPartitionTimestampCursor implements 
PartitionTimestampCursor {
     protected final AbstractPageMemoryMvPartitionStorage storage;
 
-    private @Nullable Cursor<VersionChain> versionChainCursor;
-
-    /** {@link ReadResult} obtained by caching {@link VersionChain} with 
{@link BplusTree#find(Object, Object, TreeRowClosure, Object)}. */
-    private final Map<RowId, ReadResult> readResultByRowId = new HashMap<>();
+    private @Nullable Cursor<ReadResult> cursor;
 
     private boolean iterationExhausted;
 
     private @Nullable ReadResult nextRead;
 
-    private @Nullable VersionChain currentChain;
+    private @Nullable RowId currentRowId;
 
     AbstractPartitionTimestampCursor(AbstractPageMemoryMvPartitionStorage 
storage) {
         this.storage = storage;
@@ -65,36 +61,25 @@ abstract class AbstractPartitionTimestampCursor implements 
PartitionTimestampCur
 
             createVersionChainCursorIfMissing();
 
-            currentChain = null;
+            currentRowId = null;
 
             while (true) {
-                if (!versionChainCursor.hasNext()) {
+                if (!cursor.hasNext()) {
                     iterationExhausted = true;
 
                     return false;
                 }
 
-                VersionChain chain = versionChainCursor.next();
-
-                ReadResult result = readResultByRowId.remove(chain.rowId());
+                ReadResult result = cursor.next();
 
-                if (result == null) {
-                    // TODO: IGNITE-18717 Add lock by rowId
-                    chain = storage.readVersionChain(chain.rowId());
-
-                    if (chain == null) {
-                        continue;
-                    }
-
-                    result = findRowVersion(chain);
-                }
+                RowId rowId = result.rowId();
 
                 if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
                 nextRead = result;
-                currentChain = chain;
+                currentRowId = rowId;
 
                 return true;
             }
@@ -122,8 +107,8 @@ abstract class AbstractPartitionTimestampCursor implements 
PartitionTimestampCur
 
     @Override
     public void close() {
-        if (versionChainCursor != null) {
-            versionChainCursor.close();
+        if (cursor != null) {
+            cursor.close();
         }
     }
 
@@ -132,18 +117,14 @@ abstract class AbstractPartitionTimestampCursor 
implements PartitionTimestampCur
         return storage.busy(() -> {
             storage.throwExceptionIfStorageNotInRunnableState();
 
-            if (currentChain == null) {
-                throw new IllegalStateException("Version chain missing: " + 
storage.createStorageInfo());
+            if (currentRowId == null) {
+                throw new IllegalStateException("RowId missing: " + 
storage.createStorageInfo());
             }
 
-            // TODO: IGNITE-18717 Add lock by rowId
-            VersionChain chain = 
storage.readVersionChain(currentChain.rowId());
-
-            if (chain == null) {
-                return null;
-            }
-
-            ReadResult result = storage.findRowVersionByTimestamp(chain, 
timestamp);
+            ReadResult result = storage.findVersionChain(currentRowId, 
versionChain -> versionChain == null
+                    ? ReadResult.empty(currentRowId)
+                    : storage.findRowVersionByTimestamp(versionChain, 
timestamp)
+            );
 
             if (result.isEmpty()) {
                 return null;
@@ -163,29 +144,21 @@ abstract class AbstractPartitionTimestampCursor 
implements PartitionTimestampCur
      */
     abstract ReadResult findRowVersion(VersionChain versionChain);
 
-    private void createVersionChainCursorIfMissing() {
-        if (versionChainCursor != null) {
+    void createVersionChainCursorIfMissing() {
+        if (cursor != null) {
             return;
         }
 
         try {
-            versionChainCursor = storage.versionChainTree.find(null, null, 
(tree, io, pageAddr, idx) -> {
-                // Since the BplusTree cursor caches rows that are on the same 
page, we should try to get actual ReadResult for them in this
-                // filter so as not to get into a situation when we read the 
chain and the links in it are no longer valid.
-
-                VersionChain versionChain = tree.getRow(io, pageAddr, idx);
-
-                // TODO: IGNITE-18717 Perhaps add lock by rowId
-
-                ReadResult readResult = findRowVersion(versionChain);
-
-                if (!readResult.isEmpty()) {
-                    readResultByRowId.put(versionChain.rowId(), readResult);
+            cursor = storage.versionChainTree.find(null, null, new 
TreeRowMapClosure<>() {
+                @Override
+                public ReadResult map(VersionChain treeRow) {
+                    return findRowVersion(treeRow);
                 }
-
-                return true;
             }, null);
         } catch (IgniteInternalCheckedException e) {
+            throwStorageExceptionIfItCause(e);
+
             throw new StorageException("Find failed: " + 
storage.createStorageInfo(), e);
         }
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
new file mode 100644
index 0000000000..f33493c6e3
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for
+ * {@link AbstractPageMemoryMvPartitionStorage#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final HybridTimestamp commitTimestamp;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    AddWriteCommittedInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            HybridTimestamp commitTimestamp,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.commitTimestamp = commitTimestamp;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow != null && oldRow.isUncommitted()) {
+            // This means that there is a bug in our code as the caller must 
make sure that no write intent exists below this write.
+            throw new StorageException("Write intent exists: [rowId={}, {}]", 
oldRow.rowId(), storage.createStorageInfo());
+        }
+
+        long nextLink = oldRow == null ? NULL_LINK : 
oldRow.newestCommittedLink();
+
+        RowVersion newVersion = insertCommittedRowVersion(row, 
commitTimestamp, nextLink);
+
+        newRow = VersionChain.createCommitted(rowId, newVersion.link(), 
newVersion.nextLink());
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert newRow != null;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        return OperationType.PUT;
+    }
+
+    private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row, 
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
+        byte[] rowBytes = rowBytes(row);
+
+        RowVersion rowVersion = new RowVersion(storage.partitionId, 
commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+        storage.insertRowVersion(rowVersion);
+
+        return rowVersion;
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
new file mode 100644
index 0000000000..bf66c5a1f1
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, 
int)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} and {@link 
TxIdMismatchException} which will cause form
+ * {@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final UUID txId;
+
+    private final UUID commitTableId;
+
+    private final int commitPartitionId;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable BinaryRow previousUncommittedRowVersion;
+
+    private @Nullable RowVersion toRemove;
+
+    AddWriteInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            UUID txId,
+            UUID commitTableId,
+            int commitPartitionId,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.txId = txId;
+        this.commitTableId = commitTableId;
+        this.commitPartitionId = commitPartitionId;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null) {
+            RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+
+            newRow = VersionChain.createUncommitted(rowId, txId, 
commitTableId, commitPartitionId, newVersion.link(), NULL_LINK);
+
+            return;
+        }
+
+        if (oldRow.isUncommitted()) {
+            throwIfChainBelongsToAnotherTx(oldRow);
+        }
+
+        RowVersion newVersion = insertRowVersion(row, 
oldRow.newestCommittedLink());
+
+        if (oldRow.isUncommitted()) {
+            RowVersion currentVersion = 
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
+
+            previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(currentVersion);
+
+            // As we replace an uncommitted version with new one, we need to 
remove old uncommitted version.
+            toRemove = currentVersion;
+        }
+
+        newRow = VersionChain.createUncommitted(rowId, txId, commitTableId, 
commitPartitionId, newVersion.link(), newVersion.nextLink());
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert newRow != null;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        return OperationType.PUT;
+    }
+
+    /**
+     * Returns the result for {@link MvPartitionStorage#addWrite(RowId, 
BinaryRow, UUID, UUID, int)}.
+     */
+    @Nullable BinaryRow getPreviousUncommittedRowVersion() {
+        return previousUncommittedRowVersion;
+    }
+
+    private RowVersion insertRowVersion(@Nullable BinaryRow row, long 
nextPartitionlessLink) {
+        byte[] rowBytes = rowBytes(row);
+
+        RowVersion rowVersion = new RowVersion(storage.partitionId, 
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+        storage.insertRowVersion(rowVersion);
+
+        return rowVersion;
+    }
+
+    private void throwIfChainBelongsToAnotherTx(VersionChain versionChain) {
+        assert versionChain.isUncommitted();
+
+        if (!txId.equals(versionChain.transactionId())) {
+            throw new TxIdMismatchException(txId, 
versionChain.transactionId());
+        }
+    }
+
+    /**
+     * Method to call after {@link BplusTree#invoke(Object, Object, 
InvokeClosure)} has completed.
+     */
+    void afterCompletion() {
+        if (toRemove != null) {
+            storage.removeRowVersion(toRemove);
+        }
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
new file mode 100644
index 0000000000..01b1be8e63
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class CommitWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final HybridTimestamp timestamp;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private OperationType operationType;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable Long updateTimestampLink;
+
+    CommitWriteInvokeClosure(HybridTimestamp timestamp, 
AbstractPageMemoryMvPartitionStorage storage) {
+        this.timestamp = timestamp;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null || oldRow.transactionId() == null) {
+            // Row doesn't exist or the chain doesn't contain an uncommitted 
write intent.
+            operationType = OperationType.NOOP;
+
+            return;
+        }
+
+        updateTimestampLink = oldRow.headLink();
+
+        operationType = OperationType.PUT;
+
+        newRow = VersionChain.createCommitted(oldRow.rowId(), 
oldRow.headLink(), oldRow.nextLink());
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert operationType == OperationType.PUT ? newRow != null : newRow == 
null : "newRow=" + newRow + ", op=" + operationType;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        assert operationType != null;
+
+        return operationType;
+    }
+
+    @Override
+    public void onUpdate() {
+        assert operationType == OperationType.PUT ? updateTimestampLink != 
null : updateTimestampLink == null :
+                "link=" + updateTimestampLink + ", op=" + operationType;
+
+        if (updateTimestampLink != null) {
+            try {
+                
storage.rowVersionFreeList.updateTimestamp(updateTimestampLink, timestamp);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Error while update timestamp: [link={}, timestamp={}, 
{}]",
+                        e,
+                        updateTimestampLink, timestamp, 
storage.createStorageInfo());
+            }
+        }
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
new file mode 100644
index 0000000000..91167cf785
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Class to keep track of the number of lock holders.
+ *
+ * <p>Honest holder tracking requires an external sync, such as a {@link 
ConcurrentMap}.
+ *
+ * <p>How to use:
+ * <ul>
+ *     <li>Increases the count of lock holders by {@link 
#incrementHolders()};</li>
+ *     <li>You can {@link #getLock() get a lock} and work with it;</li>
+ *     <li>After lock is no longer used, you need to call {@link 
#decrementHolders()}, which will tell you if lock holders are left.</li>
+ * </ul>
+ */
+class LockHolder<T extends Lock> {
+    private final T lock;
+
+    private final AtomicInteger lockHolder = new AtomicInteger();
+
+    LockHolder(T lock) {
+        this.lock = lock;
+    }
+
+    /**
+     * Increment the count of lock holders ({@link Thread}).
+     */
+    void incrementHolders() {
+        int holders = lockHolder.incrementAndGet();
+
+        assert holders > 0 : holders;
+    }
+
+    /**
+     * Decrements the count of lock holders ({@link Thread}), returns {@code 
true} if there are no more lock holders.
+     */
+    boolean decrementHolders() {
+        int holders = lockHolder.decrementAndGet();
+
+        assert holders >= 0 : holders;
+
+        return holders == 0;
+    }
+
+    /**
+     * Returns lock.
+     */
+    T getLock() {
+        return lock;
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
index 9f581feae4..93dd316713 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import static java.util.Collections.emptyIterator;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
 import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowVersionToResultNotFillingLastCommittedTs;
@@ -29,7 +28,6 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.util.Cursor;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Cursor reading all versions for {@link RowId}.
@@ -39,26 +37,21 @@ import org.jetbrains.annotations.Nullable;
 class ScanVersionsCursor implements Cursor<ReadResult> {
     private final AbstractPageMemoryMvPartitionStorage storage;
 
-    private final @Nullable VersionChain versionChain;
+    private final VersionChain versionChain;
 
     private final Iterator<RowVersion> rowVersionIterator;
 
     /**
      * Constructor.
      *
-     * @param rowId Row ID.
+     * @param versionChain Version chain.
      * @param storage Multi-versioned partition storage.
      * @throws StorageException If there is an error when collecting all 
versions of the chain.
      */
-    ScanVersionsCursor(
-            RowId rowId,
-            AbstractPageMemoryMvPartitionStorage storage
-    ) {
+    ScanVersionsCursor(VersionChain versionChain, 
AbstractPageMemoryMvPartitionStorage storage) {
         this.storage = storage;
-
-        versionChain = storage.readVersionChain(rowId);
-
-        rowVersionIterator = versionChain == null ? emptyIterator() : 
collectRowVersions();
+        this.versionChain = versionChain;
+        this.rowVersionIterator = collectRowVersions();
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
new file mode 100644
index 0000000000..f0802e7465
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.locks.Lock;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link LockHolder}.
+ */
+public class LockHolderTest {
+    @Test
+    void testSimple() {
+        LockHolder<Lock> lockHolder = createLockHolder();
+
+        lockHolder.incrementHolders();
+
+        assertNotNull(lockHolder.getLock());
+
+        assertTrue(lockHolder.decrementHolders());
+    }
+
+    @Test
+    void testIncrementHoldersSameThreadTwice() {
+        LockHolder<Lock> lockHolder = createLockHolder();
+
+        lockHolder.incrementHolders();
+        lockHolder.incrementHolders();
+
+        assertNotNull(lockHolder.getLock());
+
+        assertFalse(lockHolder.decrementHolders());
+        assertTrue(lockHolder.decrementHolders());
+    }
+
+    @Test
+    void testTwoThreadSimple() {
+        LockHolder<Lock> lockHolder = createLockHolder();
+
+        lockHolder.incrementHolders();
+
+        assertNotNull(lockHolder.getLock());
+
+        assertThat(runAsync(() -> {
+            lockHolder.incrementHolders();
+
+            assertNotNull(lockHolder.getLock());
+
+            assertFalse(lockHolder.decrementHolders());
+        }), willCompleteSuccessfully());
+
+        assertNotNull(lockHolder.getLock());
+
+        assertTrue(lockHolder.decrementHolders());
+    }
+
+    private static LockHolder<Lock> createLockHolder() {
+        return new LockHolder<>(mock(Lock.class));
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
index b0ba265703..bdc0544e0d 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -29,7 +29,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
 @ExtendWith(WorkDirectoryExtension.class)
 class PersistentPageMemoryMvPartitionStorageConcurrencyTest extends 
AbstractMvPartitionStorageConcurrencyTest {
     @InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0")
@@ -46,4 +45,40 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest 
extends AbstractMvPa
 
         return new PersistentPageMemoryStorageEngine("test", engineConfig, 
ioRegistry, workDir, null);
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testRegularGcAndRead(AddAndCommit addAndCommit) {
+        super.testRegularGcAndRead(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndRead(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndAddWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndCommitWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndAbortWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testConcurrentGc(AddAndCommit addAndCommit) {
+        super.testConcurrentGc(addAndCommit);
+    }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
index 22f2efcf32..4645851483 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -28,7 +28,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
 @ExtendWith(WorkDirectoryExtension.class)
 class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends 
AbstractMvPartitionStorageConcurrencyTest {
     @InjectConfiguration
@@ -42,4 +41,40 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest 
extends AbstractMvPart
 
         return new VolatilePageMemoryStorageEngine("node", engineConfig, 
ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testRegularGcAndRead(AddAndCommit addAndCommit) {
+        super.testRegularGcAndRead(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndRead(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndAddWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndCommitWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+        super.testTombstoneGcAndAbortWrite(addAndCommit);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023";)
+    @Override
+    public void testConcurrentGc(AddAndCommit addAndCommit) {
+        super.testConcurrentGc(addAndCommit);
+    }
 }

Reply via email to