This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fa0c97f524 IGNITE-27208 Prepare RowVersion read/write code to 
addition of new versions (#7112)
3fa0c97f524 is described below

commit 3fa0c97f5241da41e9eaa9fe253fbd9e19a024dd
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Dec 1 16:36:43 2025 +0400

    IGNITE-27208 Prepare RowVersion read/write code to addition of new versions 
(#7112)
---
 .../pagememory/datapage/PageMemoryTraversal.java   |   3 +-
 .../datapage/ReadPageMemoryRowValue.java           |  27 +++--
 .../internal/pagememory/freelist/FreeListImpl.java |   3 +-
 .../ignite/internal/pagememory/io/DataPageIo.java  |   3 +-
 .../storage/BaseMvPartitionStorageTest.java        |   2 +-
 .../storage/pagememory/StorageMemoryIoModule.java  |   2 +-
 .../pagememory/StoragePartitionMetaFactory.java    |   2 +-
 .../storage/pagememory/StoragePartitionMetaIo.java |  11 +-
 ...le.java => StoragePartitionMetaIoVersions.java} |  19 ++--
 .../index/freelist/ReadIndexColumnsValue.java      |  16 ++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |   2 +-
 .../pagememory/mv/AddWriteInvokeClosure.java       |  68 +++++++------
 .../storage/pagememory/mv/FindRowVersion.java      |  46 ++++-----
 .../pagememory/mv/PlainRowVersionReader.java       |  69 +++++++++++++
 ...Value.java => PlainRowVersionValueOffsets.java} |  22 ++--
 .../storage/pagememory/mv/ReadRowVersion.java      |  45 +++------
 .../storage/pagememory/mv/ReadRowVersionValue.java |  15 +--
 .../internal/storage/pagememory/mv/RowVersion.java | 111 ++++++++++++++-------
 .../storage/pagememory/mv/RowVersionReader.java    |  67 +++++++++++++
 ...rsionValue.java => RowVersionValueOffsets.java} |  30 +++---
 ...PersistentPageMemoryMvPartitionStorageTest.java |   6 +-
 21 files changed, 353 insertions(+), 216 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
index f6fd0d359ba..eb83ae53e9d 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.pagememory.datapage;
 
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Controls page memory traversal.
@@ -41,7 +42,7 @@ public interface PageMemoryTraversal<T> {
      * @param arg      argument passed to the traversal
      * @return next row link or {@link #STOP_TRAVERSAL} to stop the traversal
      */
-    long consumePagePayload(long link, long pageAddr, DataPagePayload payload, 
T arg);
+    long consumePagePayload(long link, long pageAddr, DataPagePayload payload, 
@Nullable T arg);
 
     /**
      * Called when the traversal is finished successfully.
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
index 6c8c85a3a6a..795e8c3b0b5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
@@ -50,7 +50,7 @@ public abstract class ReadPageMemoryRowValue implements 
PageMemoryTraversal<Void
     private int transferredBytes = 0;
 
     @Override
-    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Void ignoredArg) {
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, @Nullable Void ignoredArg) {
         if (readingFirstSlot) {
             readingFirstSlot = false;
             return readFullyOrStartReadingFragmented(pageAddr, payload);
@@ -61,31 +61,28 @@ public abstract class ReadPageMemoryRowValue implements 
PageMemoryTraversal<Void
     }
 
     private long readFullyOrStartReadingFragmented(long pageAddr, 
DataPagePayload payload) {
-        assert PageUtils.getByte(pageAddr, payload.offset() + 
Storable.DATA_TYPE_OFFSET) == dataType();
+        byte dataType = PageUtils.getByte(pageAddr, payload.offset() + 
Storable.DATA_TYPE_OFFSET);
 
-        valueSize = readValueSize(pageAddr, payload);
+        valueSize = readValueSize(dataType, pageAddr, payload);
 
         if (!payload.hasMoreFragments()) {
-            return readFully(pageAddr, payload);
+            return readFully(dataType, pageAddr, payload);
         } else {
             allValueBytes = new byte[valueSize];
             transferredBytes = 0;
 
-            readValueFragmentToArray(pageAddr, payload, 
valueOffsetInFirstSlot());
+            readValueFragmentToArray(pageAddr, payload, 
valueOffsetInFirstSlot(dataType));
 
             return payload.nextLink();
         }
     }
 
-    /** Returns type of the data row. */
-    protected abstract byte dataType();
-
-    private int readValueSize(long pageAddr, DataPagePayload payload) {
-        return PageUtils.getInt(pageAddr, payload.offset() + 
valueSizeOffsetInFirstSlot());
+    private int readValueSize(byte dataType, long pageAddr, DataPagePayload 
payload) {
+        return PageUtils.getInt(pageAddr, payload.offset() + 
valueSizeOffsetInFirstSlot(dataType));
     }
 
-    private long readFully(long pageAddr, DataPagePayload payload) {
-        allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() + 
valueOffsetInFirstSlot(), valueSize);
+    private long readFully(byte dataType, long pageAddr, DataPagePayload 
payload) {
+        allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() + 
valueOffsetInFirstSlot(dataType), valueSize);
 
         return STOP_TRAVERSAL;
     }
@@ -124,16 +121,18 @@ public abstract class ReadPageMemoryRowValue implements 
PageMemoryTraversal<Void
     /**
      * Memory offset into first slot at which the 'value' size is stored (as 
int).
      *
+     * @param dataType Data type we are reading.
      * @return offset into first slot at which the 'value' size is stored (as 
int)
      */
-    protected abstract int valueSizeOffsetInFirstSlot();
+    protected abstract int valueSizeOffsetInFirstSlot(byte dataType);
 
     /**
      * Memory offset into first slot at which the 'value' starts.
      *
+     * @param dataType Data type we are reading.
      * @return offset into first slot at which the 'value' starts
      */
-    protected abstract int valueOffsetInFirstSlot();
+    protected abstract int valueOffsetInFirstSlot(byte dataType);
 
     /**
      * Resets the object to make it ready for use.
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
index 85a70aa468a..88bf2da9c21 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
@@ -172,7 +172,6 @@ public class FreeListImpl extends PagesList implements 
FreeList, ReuseList {
          * @param written Written size.
          * @param rowSize Row size.
          * @return Updated written size.
-         * @throws IgniteInternalCheckedException If failed.
          */
         protected int addRowFragment(
                 long pageId,
@@ -181,7 +180,7 @@ public class FreeListImpl extends PagesList implements 
FreeList, ReuseList {
                 Storable row,
                 int written,
                 int rowSize
-        ) throws IgniteInternalCheckedException {
+        ) {
             int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, 
row, written, rowSize, pageSize());
 
             assert payloadSize > 0 : payloadSize;
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
index a059ac7c08c..4a13841acdc 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
@@ -1121,7 +1121,6 @@ public class DataPageIo extends PageIo {
      * @param rowSize Row size.
      * @param pageSize Page size.
      * @return Written payload size.
-     * @throws IgniteInternalCheckedException If failed.
      */
     public int addRowFragment(
             PageMemory pageMem,
@@ -1131,7 +1130,7 @@ public class DataPageIo extends PageIo {
             int written,
             int rowSize,
             int pageSize
-    ) throws IgniteInternalCheckedException {
+    ) {
         assertPageType(pageAddr);
 
         assert row != null;
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index c5d6ba98e63..edb30f06745 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -140,7 +140,7 @@ public abstract class BaseMvPartitionStorageTest extends 
BaseMvStoragesTest {
     /**
      * Aborts write-intent inside of consistency closure.
      */
-    AbortResult abortWrite(RowId rowId, UUID txId) {
+    protected AbortResult abortWrite(RowId rowId, UUID txId) {
         return storage.runConsistently(locker -> {
             locker.lock(rowId);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
index 6cb40aa08e8..bed0c3aef95 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
@@ -30,6 +30,6 @@ import org.apache.ignite.internal.pagememory.io.PageIoModule;
 public class StorageMemoryIoModule implements PageIoModule {
     @Override
     public Collection<IoVersions<?>> ioVersions() {
-        return List.of(StoragePartitionMetaIo.VERSIONS);
+        return List.of(StoragePartitionMetaIoVersions.VERSIONS);
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
index 8242bdab1a9..72240654532 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
@@ -56,6 +56,6 @@ public class StoragePartitionMetaFactory implements 
PartitionMetaFactory {
 
     @Override
     public StoragePartitionMetaIo partitionMetaIo() {
-        return StoragePartitionMetaIo.VERSIONS.latest();
+        return StoragePartitionMetaIoVersions.VERSIONS.latest();
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
index e11e701ad34..e03a48dd96b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
-import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
 import org.apache.ignite.internal.storage.pagememory.mv.BlobStorage;
 import org.jetbrains.annotations.Nullable;
@@ -58,10 +57,14 @@ public class StoragePartitionMetaIo extends PartitionMetaIo 
{
     private static final int PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF = 
PRIMARY_REPLICA_NODE_ID_LOW_OFF + Long.BYTES;
 
     /** Estimated size here is not a size of a meta, but an approximate rows 
count. */
-    private static final int ESTIMATED_SIZE_OFF = 
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
+    protected static final int ESTIMATED_SIZE_OFF = 
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
 
-    /** I/O versions. */
-    public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new 
IoVersions<>(new StoragePartitionMetaIo(1));
+    /**
+     * Constructor.
+     */
+    protected StoragePartitionMetaIo() {
+        this(1);
+    }
 
     /**
      * Constructor.
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
similarity index 66%
copy from 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
copy to 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
index 6cb40aa08e8..2bc7bf2638b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
@@ -17,19 +17,12 @@
 
 package org.apache.ignite.internal.storage.pagememory;
 
-import com.google.auto.service.AutoService;
-import java.util.Collection;
-import java.util.List;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.io.PageIoModule;
 
-/**
- * {@link PageIoModule} implementation in storage-page-memory module.
- */
-@AutoService(PageIoModule.class)
-public class StorageMemoryIoModule implements PageIoModule {
-    @Override
-    public Collection<IoVersions<?>> ioVersions() {
-        return List.of(StoragePartitionMetaIo.VERSIONS);
-    }
+/** Storage partition meta I/O versions. */
+public class StoragePartitionMetaIoVersions {
+    /** I/O versions. */
+    public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new 
IoVersions<>(
+            new StoragePartitionMetaIo()
+    );
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
index bf36f6779e3..d5792ad9719 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
@@ -18,24 +18,22 @@
 package org.apache.ignite.internal.storage.pagememory.index.freelist;
 
 import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
-import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
 
 /**
- * Reads {@link RowVersion#value()} from page-memory.
+ * Reads {@link IndexColumns#valueBuffer()} value from page-memory.
  */
 public class ReadIndexColumnsValue extends ReadPageMemoryRowValue {
     @Override
-    protected int valueSizeOffsetInFirstSlot() {
+    protected int valueSizeOffsetInFirstSlot(byte dataType) {
+        assert dataType == IndexColumns.DATA_TYPE;
+
         return IndexColumns.SIZE_OFFSET;
     }
 
     @Override
-    protected int valueOffsetInFirstSlot() {
-        return IndexColumns.VALUE_OFFSET;
-    }
+    protected int valueOffsetInFirstSlot(byte dataType) {
+        assert dataType == IndexColumns.DATA_TYPE;
 
-    @Override
-    protected byte dataType() {
-        return IndexColumns.DATA_TYPE;
+        return IndexColumns.VALUE_OFFSET;
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index c5b4c2a4e78..6f2fe8a34a6 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -449,7 +449,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
                 addWrite.afterCompletion();
 
-                AddWriteResult addWriteResult = addWrite.addWriteResult();
+                AddWriteResult addWriteResult = addWrite.result();
 
                 assert addWriteResult != null : addWriteInfo(rowId, row, txId, 
commitZoneId, commitPartitionId);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
index 3d284903416..465dac45137 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -42,9 +42,9 @@ import org.jetbrains.annotations.Nullable;
  * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
  */
 class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
-    private final RowId rowId;
+    protected final RowId rowId;
 
-    private final @Nullable BinaryRow row;
+    protected final @Nullable BinaryRow row;
 
     private final UUID txId;
 
@@ -52,11 +52,12 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
 
     private final int commitPartitionId;
 
-    private final AbstractPageMemoryMvPartitionStorage storage;
+    protected final AbstractPageMemoryMvPartitionStorage storage;
 
     private OperationType operationType;
 
-    private @Nullable VersionChain newRow;
+    @Nullable
+    private VersionChain newRow;
 
     private @Nullable RowVersion toRemove;
 
@@ -79,16 +80,15 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
     }
 
     @Override
-    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+    public final void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
         if (oldRow == null) {
-            RowVersion newVersion = insertRowVersion(row, NULL_LINK);
-
-            newRow = VersionChain.createUncommitted(rowId, txId, commitZoneId, 
commitPartitionId, newVersion.link(), NULL_LINK);
-
             operationType = OperationType.PUT;
-
             addWriteResult = AddWriteResult.success(null);
 
+            RowVersion newVersion = insertFirstRowVersion();
+
+            newRow = createUncommittedVersionChain(newVersion);
+
             return;
         }
 
@@ -103,29 +103,39 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
             return;
         }
 
-        RowVersion newVersion = insertRowVersion(row, 
oldRow.newestCommittedLink());
+        operationType = OperationType.PUT;
+
+        boolean replacingExistingWriteIntent = oldRow.isUncommitted();
+        RowVersion existingWriteIntent;
 
-        if (oldRow.isUncommitted()) {
-            RowVersion currentVersion = 
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
+        if (replacingExistingWriteIntent) {
+            existingWriteIntent = storage.readRowVersion(oldRow.headLink(), 
ALWAYS_LOAD_VALUE);
 
-            addWriteResult = AddWriteResult.success(currentVersion.value());
+            addWriteResult = 
AddWriteResult.success(existingWriteIntent.value());
 
-            // As we replace an uncommitted version with new one, we need to 
remove old uncommitted version.
-            toRemove = currentVersion;
+            // As we are replacing an uncommitted version with new one, we 
need to remove old uncommitted version.
+            toRemove = existingWriteIntent;
         } else {
             addWriteResult = AddWriteResult.success(null);
+
+            existingWriteIntent = null;
         }
 
-        newRow = VersionChain.createUncommitted(
-                rowId,
-                txId,
-                commitZoneId,
-                commitPartitionId,
-                newVersion.link(),
-                newVersion.nextLink()
-        );
+        RowVersion newVersion = insertAnotherRowVersion(oldRow, 
existingWriteIntent);
 
-        operationType = OperationType.PUT;
+        newRow = createUncommittedVersionChain(newVersion);
+    }
+
+    protected RowVersion insertFirstRowVersion() {
+        return insertRowVersion(NULL_LINK);
+    }
+
+    protected RowVersion insertAnotherRowVersion(VersionChain oldRow, 
@Nullable RowVersion existingWriteIntent) {
+        return insertRowVersion(oldRow.newestCommittedLink());
+    }
+
+    private VersionChain createUncommittedVersionChain(RowVersion newVersion) {
+        return VersionChain.createUncommitted(rowId, txId, commitZoneId, 
commitPartitionId, newVersion.link(), newVersion.nextLink());
     }
 
     @Override
@@ -143,8 +153,8 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
         return operationType;
     }
 
-    private RowVersion insertRowVersion(@Nullable BinaryRow row, long 
nextPartitionlessLink) {
-        RowVersion rowVersion = new RowVersion(storage.partitionId, 
nextPartitionlessLink, row);
+    private RowVersion insertRowVersion(long nextLink) {
+        RowVersion rowVersion = new RowVersion(storage.partitionId, nextLink, 
row);
 
         storage.insertRowVersion(rowVersion);
 
@@ -176,11 +186,11 @@ class AddWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
         return rowVersion.timestamp();
     }
 
-    AddWriteResult addWriteResult() {
+    AddWriteResult result() {
         return addWriteResult;
     }
 
-    private String addWriteInfo() {
+    final String addWriteInfo() {
         return storage.addWriteInfo(rowId, row, txId, commitZoneId, 
commitPartitionId);
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
index 03a8833cab5..db75b0063ad 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
 import static 
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.Storable;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
@@ -45,15 +45,7 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
 
     private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
 
-    private long rowLink = NULL_LINK;
-
-    private @Nullable HybridTimestamp rowTimestamp;
-
-    private long rowNextLink = NULL_LINK;
-
-    private int rowValueSize;
-
-    private int schemaVersion;
+    private @Nullable RowVersionReader reader;
 
     private @Nullable RowVersion result;
 
@@ -68,26 +60,21 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
             return readRowVersionValue.consumePagePayload(link, pageAddr, 
payload, null);
         }
 
-        long nextLink = readPartitionless(partitionId, pageAddr, 
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
-
         if (!filter.apply(link, pageAddr + payload.offset())) {
-            return nextLink;
+            return RowVersion.readNextLink(partitionId, pageAddr, 
payload.offset());
         }
 
         rowVersionFound = true;
 
-        rowLink = link;
-        rowTimestamp = HybridTimestamps.readTimestamp(pageAddr, 
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
-        rowNextLink = nextLink;
-        schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, 
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
+        byte dataType = PageUtils.getByte(pageAddr, payload.offset() + 
Storable.DATA_TYPE_OFFSET);
+
+        reader = RowVersionReader.newRowVersionReader(dataType, link, 
partitionId);
 
         if (loadValueBytes) {
             return readRowVersionValue.consumePagePayload(link, pageAddr, 
payload, null);
+        } else {
+            return STOP_TRAVERSAL;
         }
-
-        rowValueSize = PageUtils.getInt(pageAddr, payload.offset() + 
RowVersion.VALUE_SIZE_OFFSET);
-
-        return STOP_TRAVERSAL;
     }
 
     @Override
@@ -96,19 +83,26 @@ class FindRowVersion implements 
PageMemoryTraversal<RowVersionFilter> {
             return;
         }
 
+        assert reader != null;
+
+        BinaryRow value;
+        int valueSize;
+
         if (loadValueBytes) {
             readRowVersionValue.finish();
 
             byte[] valueBytes = readRowVersionValue.result();
 
-            BinaryRow row = valueBytes.length == 0
+            value = valueBytes.length == 0
                     ? null
-                    : new BinaryRowImpl(schemaVersion, 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
-
-            result = new RowVersion(partitionId, rowLink, rowTimestamp, 
rowNextLink, row);
+                    : new BinaryRowImpl(reader.schemaVersion(), 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+            valueSize = value == null ? 0 : value.tupleSliceLength();
         } else {
-            result = new RowVersion(partitionId, rowLink, rowTimestamp, 
rowNextLink, rowValueSize);
+            value = null;
+            valueSize = reader.valueSize();
         }
+
+        result = reader.createRowVersion(valueSize, value);
     }
 
     /**
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
new file mode 100644
index 00000000000..8f17b2597f5
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+class PlainRowVersionReader implements RowVersionReader {
+    protected final long link;
+    protected final int partitionId;
+
+    protected long nextLink;
+
+    @Nullable
+    protected HybridTimestamp timestamp;
+
+    private int schemaVersion;
+    private int valueSize;
+
+    PlainRowVersionReader(long link, int partitionId) {
+        this.link = link;
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public void readFromPage(long pageAddr, int offset) {
+        nextLink = RowVersion.readNextLink(partitionId, pageAddr, offset);
+        timestamp = HybridTimestamps.readTimestamp(pageAddr, offset + 
RowVersion.TIMESTAMP_OFFSET);
+        schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, 
offset + RowVersion.SCHEMA_VERSION_OFFSET));
+        valueSize = PageUtils.getInt(pageAddr, offset + 
RowVersion.VALUE_SIZE_OFFSET);
+    }
+
+    @Override
+    public @Nullable HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    @Override
+    public int valueSize() {
+        return valueSize;
+    }
+
+    @Override
+    public RowVersion createRowVersion(int valueSize, @Nullable BinaryRow 
value) {
+        return new RowVersion(partitionId, link, timestamp, nextLink, 
valueSize, value);
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
similarity index 70%
copy from 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
copy to 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
index 9d24b115b68..3cbb6d7e43c 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
@@ -17,26 +17,20 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
+class PlainRowVersionValueOffsets implements RowVersionValueOffsets {
+    static final PlainRowVersionValueOffsets INSTANCE = new 
PlainRowVersionValueOffsets();
 
-/**
- * Reads {@link RowVersion#value()} from page-memory.
- */
-class ReadRowVersionValue extends ReadPageMemoryRowValue {
-    /** {@inheritDoc} */
-    @Override
-    protected int valueSizeOffsetInFirstSlot() {
-        return RowVersion.VALUE_SIZE_OFFSET;
+    private PlainRowVersionValueOffsets() {
+        // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
-    protected int valueOffsetInFirstSlot() {
-        return RowVersion.VALUE_OFFSET;
+    public int valueSizeOffsetInFirstSlot() {
+        return RowVersion.VALUE_SIZE_OFFSET;
     }
 
     @Override
-    protected byte dataType() {
-        return RowVersion.DATA_TYPE;
+    public int valueOffsetInFirstSlot() {
+        return RowVersion.VALUE_OFFSET;
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index a6af6ef551b..e025acfebd2 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
-import static 
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
-
 import java.nio.ByteBuffer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.Storable;
 import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
 import org.apache.ignite.internal.pagememory.io.DataPagePayload;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
@@ -41,16 +39,10 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
 
     private boolean readingFirstSlot = true;
 
-    private long firstFragmentLink;
-
-    private @Nullable HybridTimestamp timestamp;
-
-    private long nextLink;
-
-    private int schemaVersion;
-
     private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
 
+    private @Nullable RowVersionReader reader;
+
     ReadRowVersion(int partitionId) {
         this.partitionId = partitionId;
     }
@@ -67,21 +59,19 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
     }
 
     private long readFullOrInitiateReadFragmented(long link, long pageAddr, 
DataPagePayload payload, Predicate<HybridTimestamp> loadValue) {
-        firstFragmentLink = link;
+        byte dataType = PageUtils.getByte(pageAddr, payload.offset() + 
Storable.DATA_TYPE_OFFSET);
 
-        timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset() 
+ RowVersion.TIMESTAMP_OFFSET);
-        nextLink = readPartitionless(partitionId, pageAddr, payload.offset() + 
RowVersion.NEXT_LINK_OFFSET);
-        schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr, 
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
+        reader = RowVersionReader.newRowVersionReader(dataType, link, 
partitionId);
 
-        if (!loadValue.test(timestamp)) {
-            int valueSize = PageUtils.getInt(pageAddr, payload.offset() + 
RowVersion.VALUE_SIZE_OFFSET);
+        reader.readFromPage(pageAddr, payload.offset());
 
-            result = new RowVersion(partitionIdFromLink(link), 
firstFragmentLink, timestamp, nextLink, valueSize);
+        if (!loadValue.test(reader.timestamp())) {
+            result = reader.createRowVersion(reader.valueSize(), null);
 
             return STOP_TRAVERSAL;
+        } else {
+            return readRowVersionValue.consumePagePayload(link, pageAddr, 
payload, null);
         }
-
-        return readRowVersionValue.consumePagePayload(link, pageAddr, payload, 
null);
     }
 
     @Override
@@ -91,24 +81,21 @@ class ReadRowVersion implements 
PageMemoryTraversal<Predicate<HybridTimestamp>>
             return;
         }
 
+        assert reader != null;
+
         readRowVersionValue.finish();
 
         byte[] valueBytes = readRowVersionValue.result();
 
-        BinaryRow row = valueBytes.length == 0
+        BinaryRow value = valueBytes.length == 0
                 ? null
-                : new BinaryRowImpl(schemaVersion, 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+                : new BinaryRowImpl(reader.schemaVersion(), 
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+        int valueSize = value == null ? 0 : value.tupleSliceLength();
 
-        result = new RowVersion(partitionIdFromLink(firstFragmentLink), 
firstFragmentLink, timestamp, nextLink, row);
+        result = reader.createRowVersion(valueSize, value);
     }
 
     RowVersion result() {
         return result;
     }
-
-    void reset() {
-        result = null;
-        readingFirstSlot = true;
-        readRowVersionValue.reset();
-    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
index 9d24b115b68..58ebc85b0fc 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
@@ -23,20 +23,13 @@ import 
org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
  * Reads {@link RowVersion#value()} from page-memory.
  */
 class ReadRowVersionValue extends ReadPageMemoryRowValue {
-    /** {@inheritDoc} */
     @Override
-    protected int valueSizeOffsetInFirstSlot() {
-        return RowVersion.VALUE_SIZE_OFFSET;
+    protected int valueSizeOffsetInFirstSlot(byte dataType) {
+        return 
RowVersionValueOffsets.offsetsFor(dataType).valueSizeOffsetInFirstSlot();
     }
 
-    /** {@inheritDoc} */
     @Override
-    protected int valueOffsetInFirstSlot() {
-        return RowVersion.VALUE_OFFSET;
-    }
-
-    @Override
-    protected byte dataType() {
-        return RowVersion.DATA_TYPE;
+    protected int valueOffsetInFirstSlot(byte dataType) {
+        return 
RowVersionValueOffsets.offsetsFor(dataType).valueOffsetInFirstSlot();
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 7e5a93309b9..db9118f55e8 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+import static 
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
 import static 
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless;
 
 import java.nio.ByteBuffer;
@@ -34,15 +36,18 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Represents row version inside row version chain.
  */
-public final class RowVersion implements Storable {
+public class RowVersion implements Storable {
     public static final byte DATA_TYPE = 0;
-    private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+    private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PARTITIONLESS_LINK_SIZE_BYTES;
     private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
-    private static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES;
+    protected static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES;
+
     public static final int TIMESTAMP_OFFSET = DATA_TYPE_OFFSET + 
DATA_TYPE_SIZE_BYTES;
     public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
     public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + 
NEXT_LINK_STORE_SIZE_BYTES;
     public static final int SCHEMA_VERSION_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
+
     public static final int VALUE_OFFSET = SCHEMA_VERSION_OFFSET + 
SCHEMA_VERSION_SIZE_BYTES;
 
     private final int partitionId;
@@ -62,40 +67,55 @@ public final class RowVersion implements Storable {
      * Constructor.
      */
     public RowVersion(int partitionId, long nextLink, @Nullable BinaryRow 
value) {
-        this(partitionId, 0, null, nextLink, value);
+        this(
+                partitionId,
+                NULL_LINK,
+                null,
+                nextLink,
+                value == null ? 0 : value.tupleSliceLength(),
+                value
+        );
     }
 
     /**
      * Constructor.
      */
     public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long 
nextLink, @Nullable BinaryRow value) {
-        this(partitionId, 0, commitTimestamp, nextLink, value);
+        this(
+                partitionId,
+                NULL_LINK,
+                commitTimestamp,
+                nextLink,
+                value == null ? 0 : value.tupleSliceLength(),
+                value
+        );
     }
 
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
timestamp, long nextLink, @Nullable BinaryRow value) {
-        this.partitionId = partitionId;
-        link(link);
-
-        this.timestamp = timestamp;
-        this.nextLink = nextLink;
-        this.valueSize = value == null ? 0 : value.tupleSliceLength();
-        this.value = value;
+    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
commitTimestamp, long nextLink, int valueSize) {
+        this(partitionId, link, commitTimestamp, nextLink, valueSize, null);
     }
 
     /**
      * Constructor.
      */
-    public RowVersion(int partitionId, long link, @Nullable HybridTimestamp 
timestamp, long nextLink, int valueSize) {
+    public RowVersion(
+            int partitionId,
+            long link,
+            @Nullable HybridTimestamp timestamp,
+            long nextLink,
+            int valueSize,
+            @Nullable BinaryRow value
+    ) {
         this.partitionId = partitionId;
         link(link);
 
         this.timestamp = timestamp;
         this.nextLink = nextLink;
         this.valueSize = valueSize;
-        this.value = null;
+        this.value = value;
     }
 
     public @Nullable HybridTimestamp timestamp() {
@@ -160,29 +180,27 @@ public final class RowVersion implements Storable {
     }
 
     @Override
-    public void writeRowData(long pageAddr, int dataOff, int payloadSize, 
boolean newRow) {
+    public final void writeRowData(long pageAddr, int dataOff, int 
payloadSize, boolean newRow) {
         PageUtils.putShort(pageAddr, dataOff, (short) payloadSize);
         dataOff += Short.BYTES;
 
-        PageUtils.putByte(pageAddr, dataOff + DATA_TYPE_OFFSET, DATA_TYPE);
-
-        HybridTimestamps.writeTimestampToMemory(pageAddr, dataOff + 
TIMESTAMP_OFFSET, timestamp());
-
-        writePartitionless(pageAddr + dataOff + NEXT_LINK_OFFSET, nextLink());
-
-        PageUtils.putInt(pageAddr, dataOff + VALUE_SIZE_OFFSET, valueSize());
+        writeHeader(pageAddr, dataOff);
 
         if (value != null) {
-            PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET, 
(short) value.schemaVersion());
-
-            PageUtils.putByteBuffer(pageAddr, dataOff + VALUE_OFFSET, 
value.tupleSlice());
-        } else {
-            PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET, 
(short) 0);
+            PageUtils.putByteBuffer(pageAddr, dataOff + valueOffset(), 
value.tupleSlice());
         }
     }
 
+    protected byte dataType() {
+        return DATA_TYPE;
+    }
+
+    protected int valueOffset() {
+        return VALUE_OFFSET;
+    }
+
     @Override
-    public void writeFragmentData(ByteBuffer pageBuf, int rowOff, int 
payloadSize) {
+    public final void writeFragmentData(ByteBuffer pageBuf, int rowOff, int 
payloadSize) {
         int headerSize = headerSize();
 
         int bufferOffset;
@@ -193,15 +211,7 @@ public final class RowVersion implements Storable {
             assert headerSize <= payloadSize : "Header must entirely fit in 
the first fragment, but header size is "
                     + headerSize + " and payload size is " + payloadSize;
 
-            pageBuf.put(DATA_TYPE);
-
-            HybridTimestamps.writeTimestampToBuffer(pageBuf, timestamp());
-
-            PartitionlessLinks.writeToBuffer(pageBuf, nextLink());
-
-            pageBuf.putInt(valueSize());
-
-            pageBuf.putShort(value == null ? 0 : (short) 
value.schemaVersion());
+            writeHeader(pageBuf);
 
             bufferOffset = 0;
             bufferSize = payloadSize - headerSize;
@@ -218,6 +228,31 @@ public final class RowVersion implements Storable {
         }
     }
 
+    protected void writeHeader(long pageAddr, int dataOff) {
+        PageUtils.putByte(pageAddr, dataOff + DATA_TYPE_OFFSET, dataType());
+        HybridTimestamps.writeTimestampToMemory(pageAddr, dataOff + 
TIMESTAMP_OFFSET, timestamp());
+        writePartitionless(pageAddr + dataOff + NEXT_LINK_OFFSET, nextLink());
+        PageUtils.putInt(pageAddr, dataOff + VALUE_SIZE_OFFSET, valueSize());
+        PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET, 
schemaVersionOrZero());
+    }
+
+    protected void writeHeader(ByteBuffer pageBuf) {
+        pageBuf.put(dataType());
+        HybridTimestamps.writeTimestampToBuffer(pageBuf, timestamp());
+        PartitionlessLinks.writeToBuffer(pageBuf, nextLink());
+        pageBuf.putInt(valueSize());
+        pageBuf.putShort(schemaVersionOrZero());
+    }
+
+    private short schemaVersionOrZero() {
+        //noinspection NumericCastThatLosesPrecision
+        return value == null ? 0 : (short) value.schemaVersion();
+    }
+
+    static long readNextLink(int partitionId, long pageAddr, int offset) {
+        return readPartitionless(partitionId, pageAddr, offset + 
NEXT_LINK_OFFSET);
+    }
+
     @Override
     public String toString() {
         return S.toString(RowVersion.class, this);
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
new file mode 100644
index 00000000000..7b723899b6a
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstracts out reading of {@link RowVersion}s of different formats.
+ *
+ * <p>Implementations are stateful and meant to be used for reading a single 
row version.
+ */
+interface RowVersionReader {
+    static RowVersionReader newRowVersionReader(byte dataType, long link, int 
partitionId) {
+        switch (dataType) {
+            case RowVersion.DATA_TYPE:
+                return new PlainRowVersionReader(link, partitionId);
+            default:
+                throw new IllegalStateException("Unsupported data type: " + 
dataType);
+        }
+    }
+
+    /**
+     * Reads row version data from the given page address and offset.
+     *
+     * <p>This is executed under a read lock over the corresponding page.
+     *
+     * @param pageAddr Page address.
+     * @param offset Row version offset within the page.
+     */
+    void readFromPage(long pageAddr, int offset);
+
+    /** Returns the timestamp of the row version that was read by {@link 
#readFromPage(long, int)}. */
+    @Nullable
+    HybridTimestamp timestamp();
+
+    /** Returns the schema version of the row version that was read by {@link 
#readFromPage(long, int)}. */
+    int schemaVersion();
+
+    /** Returns value size of the row version that was read by {@link 
#readFromPage(long, int)}. */
+    int valueSize();
+
+    /**
+     * Creates a {@link RowVersion} instance using the data read by {@link 
#readFromPage(long, int)}.
+     *
+     * @param valueSize Value size.
+     * @param value Binary value (can be {@code null} if not loaded).
+     * @return New row version.
+     */
+    RowVersion createRowVersion(int valueSize, @Nullable BinaryRow value);
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
similarity index 61%
copy from 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
copy to 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
index 9d24b115b68..a8a2427e408 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
@@ -17,26 +17,22 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
-
 /**
- * Reads {@link RowVersion#value()} from page-memory.
+ * Provides offsets of value bytes of {@link RowVersion}s of different 
versions.
+ *
+ * <p>Implementations are stateless and can be reused.
  */
-class ReadRowVersionValue extends ReadPageMemoryRowValue {
-    /** {@inheritDoc} */
-    @Override
-    protected int valueSizeOffsetInFirstSlot() {
-        return RowVersion.VALUE_SIZE_OFFSET;
+interface RowVersionValueOffsets {
+    static RowVersionValueOffsets offsetsFor(byte dataType) {
+        switch (dataType) {
+            case RowVersion.DATA_TYPE:
+                return PlainRowVersionValueOffsets.INSTANCE;
+            default:
+                throw new IllegalStateException("Unsupported data type: " + 
dataType);
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override
-    protected int valueOffsetInFirstSlot() {
-        return RowVersion.VALUE_OFFSET;
-    }
+    int valueSizeOffsetInFirstSlot();
 
-    @Override
-    protected byte dataType() {
-        return RowVersion.DATA_TYPE;
-    }
+    int valueOffsetInFirstSlot();
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index bd9b80ad52b..a1985b93d10 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -210,7 +210,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
     }
 
     @Test
-    void testShouldReleaseReturnsFalseWhenNoCheckpointWaiting() throws 
Exception {
+    void testShouldReleaseReturnsFalseWhenNoCheckpointWaiting() {
         AtomicBoolean shouldReleaseValue = new AtomicBoolean(false);
 
         storage.runConsistently(locker -> {
@@ -223,7 +223,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
     }
 
     @Test
-    void testNestedRunConsistentlyInheritsLocker() throws Exception {
+    void testNestedRunConsistentlyInheritsLocker() {
         AtomicBoolean outerShouldRelease = new AtomicBoolean(false);
         AtomicBoolean innerShouldRelease = new AtomicBoolean(false);
 
@@ -248,7 +248,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
     }
 
     @Test
-    void testShouldReleaseReturnsTrueWhenWriterIsWaiting() throws Exception {
+    void testShouldReleaseReturnsTrueWhenWriterIsWaiting() {
         AtomicBoolean shouldReleaseValue = new AtomicBoolean(false);
 
         storage.runConsistently(locker -> {

Reply via email to