This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3fa0c97f524 IGNITE-27208 Prepare RowVersion read/write code to
addition of new versions (#7112)
3fa0c97f524 is described below
commit 3fa0c97f5241da41e9eaa9fe253fbd9e19a024dd
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Dec 1 16:36:43 2025 +0400
IGNITE-27208 Prepare RowVersion read/write code to addition of new versions
(#7112)
---
.../pagememory/datapage/PageMemoryTraversal.java | 3 +-
.../datapage/ReadPageMemoryRowValue.java | 27 +++--
.../internal/pagememory/freelist/FreeListImpl.java | 3 +-
.../ignite/internal/pagememory/io/DataPageIo.java | 3 +-
.../storage/BaseMvPartitionStorageTest.java | 2 +-
.../storage/pagememory/StorageMemoryIoModule.java | 2 +-
.../pagememory/StoragePartitionMetaFactory.java | 2 +-
.../storage/pagememory/StoragePartitionMetaIo.java | 11 +-
...le.java => StoragePartitionMetaIoVersions.java} | 19 ++--
.../index/freelist/ReadIndexColumnsValue.java | 16 ++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 2 +-
.../pagememory/mv/AddWriteInvokeClosure.java | 68 +++++++------
.../storage/pagememory/mv/FindRowVersion.java | 46 ++++-----
.../pagememory/mv/PlainRowVersionReader.java | 69 +++++++++++++
...Value.java => PlainRowVersionValueOffsets.java} | 22 ++--
.../storage/pagememory/mv/ReadRowVersion.java | 45 +++------
.../storage/pagememory/mv/ReadRowVersionValue.java | 15 +--
.../internal/storage/pagememory/mv/RowVersion.java | 111 ++++++++++++++-------
.../storage/pagememory/mv/RowVersionReader.java | 67 +++++++++++++
...rsionValue.java => RowVersionValueOffsets.java} | 30 +++---
...PersistentPageMemoryMvPartitionStorageTest.java | 6 +-
21 files changed, 353 insertions(+), 216 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
index f6fd0d359ba..eb83ae53e9d 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.pagememory.datapage;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.jetbrains.annotations.Nullable;
/**
* Controls page memory traversal.
@@ -41,7 +42,7 @@ public interface PageMemoryTraversal<T> {
* @param arg argument passed to the traversal
* @return next row link or {@link #STOP_TRAVERSAL} to stop the traversal
*/
- long consumePagePayload(long link, long pageAddr, DataPagePayload payload,
T arg);
+ long consumePagePayload(long link, long pageAddr, DataPagePayload payload,
@Nullable T arg);
/**
* Called when the traversal is finished successfully.
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
index 6c8c85a3a6a..795e8c3b0b5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
@@ -50,7 +50,7 @@ public abstract class ReadPageMemoryRowValue implements
PageMemoryTraversal<Void
private int transferredBytes = 0;
@Override
- public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, Void ignoredArg) {
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, @Nullable Void ignoredArg) {
if (readingFirstSlot) {
readingFirstSlot = false;
return readFullyOrStartReadingFragmented(pageAddr, payload);
@@ -61,31 +61,28 @@ public abstract class ReadPageMemoryRowValue implements
PageMemoryTraversal<Void
}
private long readFullyOrStartReadingFragmented(long pageAddr,
DataPagePayload payload) {
- assert PageUtils.getByte(pageAddr, payload.offset() +
Storable.DATA_TYPE_OFFSET) == dataType();
+ byte dataType = PageUtils.getByte(pageAddr, payload.offset() +
Storable.DATA_TYPE_OFFSET);
- valueSize = readValueSize(pageAddr, payload);
+ valueSize = readValueSize(dataType, pageAddr, payload);
if (!payload.hasMoreFragments()) {
- return readFully(pageAddr, payload);
+ return readFully(dataType, pageAddr, payload);
} else {
allValueBytes = new byte[valueSize];
transferredBytes = 0;
- readValueFragmentToArray(pageAddr, payload,
valueOffsetInFirstSlot());
+ readValueFragmentToArray(pageAddr, payload,
valueOffsetInFirstSlot(dataType));
return payload.nextLink();
}
}
- /** Returns type of the data row. */
- protected abstract byte dataType();
-
- private int readValueSize(long pageAddr, DataPagePayload payload) {
- return PageUtils.getInt(pageAddr, payload.offset() +
valueSizeOffsetInFirstSlot());
+ private int readValueSize(byte dataType, long pageAddr, DataPagePayload
payload) {
+ return PageUtils.getInt(pageAddr, payload.offset() +
valueSizeOffsetInFirstSlot(dataType));
}
- private long readFully(long pageAddr, DataPagePayload payload) {
- allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() +
valueOffsetInFirstSlot(), valueSize);
+ private long readFully(byte dataType, long pageAddr, DataPagePayload
payload) {
+ allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() +
valueOffsetInFirstSlot(dataType), valueSize);
return STOP_TRAVERSAL;
}
@@ -124,16 +121,18 @@ public abstract class ReadPageMemoryRowValue implements
PageMemoryTraversal<Void
/**
* Memory offset into first slot at which the 'value' size is stored (as
int).
*
+ * @param dataType Data type we are reading.
* @return offset into first slot at which the 'value' size is stored (as
int)
*/
- protected abstract int valueSizeOffsetInFirstSlot();
+ protected abstract int valueSizeOffsetInFirstSlot(byte dataType);
/**
* Memory offset into first slot at which the 'value' starts.
*
+ * @param dataType Data type we are reading.
* @return offset into first slot at which the 'value' starts
*/
- protected abstract int valueOffsetInFirstSlot();
+ protected abstract int valueOffsetInFirstSlot(byte dataType);
/**
* Resets the object to make it ready for use.
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
index 85a70aa468a..88bf2da9c21 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeListImpl.java
@@ -172,7 +172,6 @@ public class FreeListImpl extends PagesList implements
FreeList, ReuseList {
* @param written Written size.
* @param rowSize Row size.
* @return Updated written size.
- * @throws IgniteInternalCheckedException If failed.
*/
protected int addRowFragment(
long pageId,
@@ -181,7 +180,7 @@ public class FreeListImpl extends PagesList implements
FreeList, ReuseList {
Storable row,
int written,
int rowSize
- ) throws IgniteInternalCheckedException {
+ ) {
int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr,
row, written, rowSize, pageSize());
assert payloadSize > 0 : payloadSize;
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
index a059ac7c08c..4a13841acdc 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPageIo.java
@@ -1121,7 +1121,6 @@ public class DataPageIo extends PageIo {
* @param rowSize Row size.
* @param pageSize Page size.
* @return Written payload size.
- * @throws IgniteInternalCheckedException If failed.
*/
public int addRowFragment(
PageMemory pageMem,
@@ -1131,7 +1130,7 @@ public class DataPageIo extends PageIo {
int written,
int rowSize,
int pageSize
- ) throws IgniteInternalCheckedException {
+ ) {
assertPageType(pageAddr);
assert row != null;
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index c5d6ba98e63..edb30f06745 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -140,7 +140,7 @@ public abstract class BaseMvPartitionStorageTest extends
BaseMvStoragesTest {
/**
* Aborts write-intent inside of consistency closure.
*/
- AbortResult abortWrite(RowId rowId, UUID txId) {
+ protected AbortResult abortWrite(RowId rowId, UUID txId) {
return storage.runConsistently(locker -> {
locker.lock(rowId);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
index 6cb40aa08e8..bed0c3aef95 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
@@ -30,6 +30,6 @@ import org.apache.ignite.internal.pagememory.io.PageIoModule;
public class StorageMemoryIoModule implements PageIoModule {
@Override
public Collection<IoVersions<?>> ioVersions() {
- return List.of(StoragePartitionMetaIo.VERSIONS);
+ return List.of(StoragePartitionMetaIoVersions.VERSIONS);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
index 8242bdab1a9..72240654532 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaFactory.java
@@ -56,6 +56,6 @@ public class StoragePartitionMetaFactory implements
PartitionMetaFactory {
@Override
public StoragePartitionMetaIo partitionMetaIo() {
- return StoragePartitionMetaIo.VERSIONS.latest();
+ return StoragePartitionMetaIoVersions.VERSIONS.latest();
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
index e11e701ad34..e03a48dd96b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
-import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
import org.apache.ignite.internal.storage.pagememory.mv.BlobStorage;
import org.jetbrains.annotations.Nullable;
@@ -58,10 +57,14 @@ public class StoragePartitionMetaIo extends PartitionMetaIo
{
private static final int PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF =
PRIMARY_REPLICA_NODE_ID_LOW_OFF + Long.BYTES;
/** Estimated size here is not a size of a meta, but an approximate rows
count. */
- private static final int ESTIMATED_SIZE_OFF =
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
+ protected static final int ESTIMATED_SIZE_OFF =
PRIMARY_REPLICA_NODE_NAME_FIRST_PAGE_ID_OFF + Long.BYTES;
- /** I/O versions. */
- public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new
IoVersions<>(new StoragePartitionMetaIo(1));
+ /**
+ * Constructor.
+ */
+ protected StoragePartitionMetaIo() {
+ this(1);
+ }
/**
* Constructor.
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
similarity index 66%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
index 6cb40aa08e8..2bc7bf2638b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StorageMemoryIoModule.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoVersions.java
@@ -17,19 +17,12 @@
package org.apache.ignite.internal.storage.pagememory;
-import com.google.auto.service.AutoService;
-import java.util.Collection;
-import java.util.List;
import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.io.PageIoModule;
-/**
- * {@link PageIoModule} implementation in storage-page-memory module.
- */
-@AutoService(PageIoModule.class)
-public class StorageMemoryIoModule implements PageIoModule {
- @Override
- public Collection<IoVersions<?>> ioVersions() {
- return List.of(StoragePartitionMetaIo.VERSIONS);
- }
+/** Storage partition meta I/O versions. */
+public class StoragePartitionMetaIoVersions {
+ /** I/O versions. */
+ public static final IoVersions<StoragePartitionMetaIo> VERSIONS = new
IoVersions<>(
+ new StoragePartitionMetaIo()
+ );
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
index bf36f6779e3..d5792ad9719 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/freelist/ReadIndexColumnsValue.java
@@ -18,24 +18,22 @@
package org.apache.ignite.internal.storage.pagememory.index.freelist;
import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
-import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
/**
- * Reads {@link RowVersion#value()} from page-memory.
+ * Reads {@link IndexColumns#valueBuffer()} value from page-memory.
*/
public class ReadIndexColumnsValue extends ReadPageMemoryRowValue {
@Override
- protected int valueSizeOffsetInFirstSlot() {
+ protected int valueSizeOffsetInFirstSlot(byte dataType) {
+ assert dataType == IndexColumns.DATA_TYPE;
+
return IndexColumns.SIZE_OFFSET;
}
@Override
- protected int valueOffsetInFirstSlot() {
- return IndexColumns.VALUE_OFFSET;
- }
+ protected int valueOffsetInFirstSlot(byte dataType) {
+ assert dataType == IndexColumns.DATA_TYPE;
- @Override
- protected byte dataType() {
- return IndexColumns.DATA_TYPE;
+ return IndexColumns.VALUE_OFFSET;
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index c5b4c2a4e78..6f2fe8a34a6 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -449,7 +449,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
addWrite.afterCompletion();
- AddWriteResult addWriteResult = addWrite.addWriteResult();
+ AddWriteResult addWriteResult = addWrite.result();
assert addWriteResult != null : addWriteInfo(rowId, row, txId,
commitZoneId, commitPartitionId);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
index 3d284903416..465dac45137 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -42,9 +42,9 @@ import org.jetbrains.annotations.Nullable;
* <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
*/
class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
- private final RowId rowId;
+ protected final RowId rowId;
- private final @Nullable BinaryRow row;
+ protected final @Nullable BinaryRow row;
private final UUID txId;
@@ -52,11 +52,12 @@ class AddWriteInvokeClosure implements
InvokeClosure<VersionChain> {
private final int commitPartitionId;
- private final AbstractPageMemoryMvPartitionStorage storage;
+ protected final AbstractPageMemoryMvPartitionStorage storage;
private OperationType operationType;
- private @Nullable VersionChain newRow;
+ @Nullable
+ private VersionChain newRow;
private @Nullable RowVersion toRemove;
@@ -79,16 +80,15 @@ class AddWriteInvokeClosure implements
InvokeClosure<VersionChain> {
}
@Override
- public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ public final void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
if (oldRow == null) {
- RowVersion newVersion = insertRowVersion(row, NULL_LINK);
-
- newRow = VersionChain.createUncommitted(rowId, txId, commitZoneId,
commitPartitionId, newVersion.link(), NULL_LINK);
-
operationType = OperationType.PUT;
-
addWriteResult = AddWriteResult.success(null);
+ RowVersion newVersion = insertFirstRowVersion();
+
+ newRow = createUncommittedVersionChain(newVersion);
+
return;
}
@@ -103,29 +103,39 @@ class AddWriteInvokeClosure implements
InvokeClosure<VersionChain> {
return;
}
- RowVersion newVersion = insertRowVersion(row,
oldRow.newestCommittedLink());
+ operationType = OperationType.PUT;
+
+ boolean replacingExistingWriteIntent = oldRow.isUncommitted();
+ RowVersion existingWriteIntent;
- if (oldRow.isUncommitted()) {
- RowVersion currentVersion =
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
+ if (replacingExistingWriteIntent) {
+ existingWriteIntent = storage.readRowVersion(oldRow.headLink(),
ALWAYS_LOAD_VALUE);
- addWriteResult = AddWriteResult.success(currentVersion.value());
+ addWriteResult =
AddWriteResult.success(existingWriteIntent.value());
- // As we replace an uncommitted version with new one, we need to
remove old uncommitted version.
- toRemove = currentVersion;
+ // As we are replacing an uncommitted version with new one, we
need to remove old uncommitted version.
+ toRemove = existingWriteIntent;
} else {
addWriteResult = AddWriteResult.success(null);
+
+ existingWriteIntent = null;
}
- newRow = VersionChain.createUncommitted(
- rowId,
- txId,
- commitZoneId,
- commitPartitionId,
- newVersion.link(),
- newVersion.nextLink()
- );
+ RowVersion newVersion = insertAnotherRowVersion(oldRow,
existingWriteIntent);
- operationType = OperationType.PUT;
+ newRow = createUncommittedVersionChain(newVersion);
+ }
+
+ protected RowVersion insertFirstRowVersion() {
+ return insertRowVersion(NULL_LINK);
+ }
+
+ protected RowVersion insertAnotherRowVersion(VersionChain oldRow,
@Nullable RowVersion existingWriteIntent) {
+ return insertRowVersion(oldRow.newestCommittedLink());
+ }
+
+ private VersionChain createUncommittedVersionChain(RowVersion newVersion) {
+ return VersionChain.createUncommitted(rowId, txId, commitZoneId,
commitPartitionId, newVersion.link(), newVersion.nextLink());
}
@Override
@@ -143,8 +153,8 @@ class AddWriteInvokeClosure implements
InvokeClosure<VersionChain> {
return operationType;
}
- private RowVersion insertRowVersion(@Nullable BinaryRow row, long
nextPartitionlessLink) {
- RowVersion rowVersion = new RowVersion(storage.partitionId,
nextPartitionlessLink, row);
+ private RowVersion insertRowVersion(long nextLink) {
+ RowVersion rowVersion = new RowVersion(storage.partitionId, nextLink,
row);
storage.insertRowVersion(rowVersion);
@@ -176,11 +186,11 @@ class AddWriteInvokeClosure implements
InvokeClosure<VersionChain> {
return rowVersion.timestamp();
}
- AddWriteResult addWriteResult() {
+ AddWriteResult result() {
return addWriteResult;
}
- private String addWriteInfo() {
+ final String addWriteInfo() {
return storage.addWriteInfo(rowId, row, txId, commitZoneId,
commitPartitionId);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
index 03a8833cab5..db75b0063ad 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.Storable;
import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
import org.apache.ignite.internal.pagememory.util.PageUtils;
@@ -45,15 +45,7 @@ class FindRowVersion implements
PageMemoryTraversal<RowVersionFilter> {
private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
- private long rowLink = NULL_LINK;
-
- private @Nullable HybridTimestamp rowTimestamp;
-
- private long rowNextLink = NULL_LINK;
-
- private int rowValueSize;
-
- private int schemaVersion;
+ private @Nullable RowVersionReader reader;
private @Nullable RowVersion result;
@@ -68,26 +60,21 @@ class FindRowVersion implements
PageMemoryTraversal<RowVersionFilter> {
return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
}
- long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
-
if (!filter.apply(link, pageAddr + payload.offset())) {
- return nextLink;
+ return RowVersion.readNextLink(partitionId, pageAddr,
payload.offset());
}
rowVersionFound = true;
- rowLink = link;
- rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
- rowNextLink = nextLink;
- schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr,
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
+ byte dataType = PageUtils.getByte(pageAddr, payload.offset() +
Storable.DATA_TYPE_OFFSET);
+
+ reader = RowVersionReader.newRowVersionReader(dataType, link,
partitionId);
if (loadValueBytes) {
return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ } else {
+ return STOP_TRAVERSAL;
}
-
- rowValueSize = PageUtils.getInt(pageAddr, payload.offset() +
RowVersion.VALUE_SIZE_OFFSET);
-
- return STOP_TRAVERSAL;
}
@Override
@@ -96,19 +83,26 @@ class FindRowVersion implements
PageMemoryTraversal<RowVersionFilter> {
return;
}
+ assert reader != null;
+
+ BinaryRow value;
+ int valueSize;
+
if (loadValueBytes) {
readRowVersionValue.finish();
byte[] valueBytes = readRowVersionValue.result();
- BinaryRow row = valueBytes.length == 0
+ value = valueBytes.length == 0
? null
- : new BinaryRowImpl(schemaVersion,
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
-
- result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, row);
+ : new BinaryRowImpl(reader.schemaVersion(),
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+ valueSize = value == null ? 0 : value.tupleSliceLength();
} else {
- result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, rowValueSize);
+ value = null;
+ valueSize = reader.valueSize();
}
+
+ result = reader.createRowVersion(valueSize, value);
}
/**
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
new file mode 100644
index 00000000000..8f17b2597f5
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+class PlainRowVersionReader implements RowVersionReader {
+ protected final long link;
+ protected final int partitionId;
+
+ protected long nextLink;
+
+ @Nullable
+ protected HybridTimestamp timestamp;
+
+ private int schemaVersion;
+ private int valueSize;
+
+ PlainRowVersionReader(long link, int partitionId) {
+ this.link = link;
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public void readFromPage(long pageAddr, int offset) {
+ nextLink = RowVersion.readNextLink(partitionId, pageAddr, offset);
+ timestamp = HybridTimestamps.readTimestamp(pageAddr, offset +
RowVersion.TIMESTAMP_OFFSET);
+ schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr,
offset + RowVersion.SCHEMA_VERSION_OFFSET));
+ valueSize = PageUtils.getInt(pageAddr, offset +
RowVersion.VALUE_SIZE_OFFSET);
+ }
+
+ @Override
+ public @Nullable HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int schemaVersion() {
+ return schemaVersion;
+ }
+
+ @Override
+ public int valueSize() {
+ return valueSize;
+ }
+
+ @Override
+ public RowVersion createRowVersion(int valueSize, @Nullable BinaryRow
value) {
+ return new RowVersion(partitionId, link, timestamp, nextLink,
valueSize, value);
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
similarity index 70%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
index 9d24b115b68..3cbb6d7e43c 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionValueOffsets.java
@@ -17,26 +17,20 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
+class PlainRowVersionValueOffsets implements RowVersionValueOffsets {
+ static final PlainRowVersionValueOffsets INSTANCE = new
PlainRowVersionValueOffsets();
-/**
- * Reads {@link RowVersion#value()} from page-memory.
- */
-class ReadRowVersionValue extends ReadPageMemoryRowValue {
- /** {@inheritDoc} */
- @Override
- protected int valueSizeOffsetInFirstSlot() {
- return RowVersion.VALUE_SIZE_OFFSET;
+ private PlainRowVersionValueOffsets() {
+ // No-op.
}
- /** {@inheritDoc} */
@Override
- protected int valueOffsetInFirstSlot() {
- return RowVersion.VALUE_OFFSET;
+ public int valueSizeOffsetInFirstSlot() {
+ return RowVersion.VALUE_SIZE_OFFSET;
}
@Override
- protected byte dataType() {
- return RowVersion.DATA_TYPE;
+ public int valueOffsetInFirstSlot() {
+ return RowVersion.VALUE_OFFSET;
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index a6af6ef551b..e025acfebd2 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -17,12 +17,10 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
-import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
-
import java.nio.ByteBuffer;
import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.Storable;
import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
import org.apache.ignite.internal.pagememory.util.PageUtils;
@@ -41,16 +39,10 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
private boolean readingFirstSlot = true;
- private long firstFragmentLink;
-
- private @Nullable HybridTimestamp timestamp;
-
- private long nextLink;
-
- private int schemaVersion;
-
private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+ private @Nullable RowVersionReader reader;
+
ReadRowVersion(int partitionId) {
this.partitionId = partitionId;
}
@@ -67,21 +59,19 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
}
private long readFullOrInitiateReadFragmented(long link, long pageAddr,
DataPagePayload payload, Predicate<HybridTimestamp> loadValue) {
- firstFragmentLink = link;
+ byte dataType = PageUtils.getByte(pageAddr, payload.offset() +
Storable.DATA_TYPE_OFFSET);
- timestamp = HybridTimestamps.readTimestamp(pageAddr, payload.offset()
+ RowVersion.TIMESTAMP_OFFSET);
- nextLink = readPartitionless(partitionId, pageAddr, payload.offset() +
RowVersion.NEXT_LINK_OFFSET);
- schemaVersion = Short.toUnsignedInt(PageUtils.getShort(pageAddr,
payload.offset() + RowVersion.SCHEMA_VERSION_OFFSET));
+ reader = RowVersionReader.newRowVersionReader(dataType, link,
partitionId);
- if (!loadValue.test(timestamp)) {
- int valueSize = PageUtils.getInt(pageAddr, payload.offset() +
RowVersion.VALUE_SIZE_OFFSET);
+ reader.readFromPage(pageAddr, payload.offset());
- result = new RowVersion(partitionIdFromLink(link),
firstFragmentLink, timestamp, nextLink, valueSize);
+ if (!loadValue.test(reader.timestamp())) {
+ result = reader.createRowVersion(reader.valueSize(), null);
return STOP_TRAVERSAL;
+ } else {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
}
-
- return readRowVersionValue.consumePagePayload(link, pageAddr, payload,
null);
}
@Override
@@ -91,24 +81,21 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
return;
}
+ assert reader != null;
+
readRowVersionValue.finish();
byte[] valueBytes = readRowVersionValue.result();
- BinaryRow row = valueBytes.length == 0
+ BinaryRow value = valueBytes.length == 0
? null
- : new BinaryRowImpl(schemaVersion,
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+ : new BinaryRowImpl(reader.schemaVersion(),
ByteBuffer.wrap(valueBytes).order(BinaryTuple.ORDER));
+ int valueSize = value == null ? 0 : value.tupleSliceLength();
- result = new RowVersion(partitionIdFromLink(firstFragmentLink),
firstFragmentLink, timestamp, nextLink, row);
+ result = reader.createRowVersion(valueSize, value);
}
RowVersion result() {
return result;
}
-
- void reset() {
- result = null;
- readingFirstSlot = true;
- readRowVersionValue.reset();
- }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
index 9d24b115b68..58ebc85b0fc 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
@@ -23,20 +23,13 @@ import
org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
* Reads {@link RowVersion#value()} from page-memory.
*/
class ReadRowVersionValue extends ReadPageMemoryRowValue {
- /** {@inheritDoc} */
@Override
- protected int valueSizeOffsetInFirstSlot() {
- return RowVersion.VALUE_SIZE_OFFSET;
+ protected int valueSizeOffsetInFirstSlot(byte dataType) {
+ return
RowVersionValueOffsets.offsetsFor(dataType).valueSizeOffsetInFirstSlot();
}
- /** {@inheritDoc} */
@Override
- protected int valueOffsetInFirstSlot() {
- return RowVersion.VALUE_OFFSET;
- }
-
- @Override
- protected byte dataType() {
- return RowVersion.DATA_TYPE;
+ protected int valueOffsetInFirstSlot(byte dataType) {
+ return
RowVersionValueOffsets.offsetsFor(dataType).valueOffsetInFirstSlot();
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 7e5a93309b9..db9118f55e8 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless;
import java.nio.ByteBuffer;
@@ -34,15 +36,18 @@ import org.jetbrains.annotations.Nullable;
/**
* Represents row version inside row version chain.
*/
-public final class RowVersion implements Storable {
+public class RowVersion implements Storable {
public static final byte DATA_TYPE = 0;
- private static final int NEXT_LINK_STORE_SIZE_BYTES =
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+ private static final int NEXT_LINK_STORE_SIZE_BYTES =
PARTITIONLESS_LINK_SIZE_BYTES;
private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
- private static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES;
+ protected static final int SCHEMA_VERSION_SIZE_BYTES = Short.BYTES;
+
public static final int TIMESTAMP_OFFSET = DATA_TYPE_OFFSET +
DATA_TYPE_SIZE_BYTES;
public static final int NEXT_LINK_OFFSET = TIMESTAMP_OFFSET +
HYBRID_TIMESTAMP_SIZE;
public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET +
NEXT_LINK_STORE_SIZE_BYTES;
public static final int SCHEMA_VERSION_OFFSET = VALUE_SIZE_OFFSET +
VALUE_SIZE_STORE_SIZE_BYTES;
+
public static final int VALUE_OFFSET = SCHEMA_VERSION_OFFSET +
SCHEMA_VERSION_SIZE_BYTES;
private final int partitionId;
@@ -62,40 +67,55 @@ public final class RowVersion implements Storable {
* Constructor.
*/
public RowVersion(int partitionId, long nextLink, @Nullable BinaryRow
value) {
- this(partitionId, 0, null, nextLink, value);
+ this(
+ partitionId,
+ NULL_LINK,
+ null,
+ nextLink,
+ value == null ? 0 : value.tupleSliceLength(),
+ value
+ );
}
/**
* Constructor.
*/
public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long
nextLink, @Nullable BinaryRow value) {
- this(partitionId, 0, commitTimestamp, nextLink, value);
+ this(
+ partitionId,
+ NULL_LINK,
+ commitTimestamp,
+ nextLink,
+ value == null ? 0 : value.tupleSliceLength(),
+ value
+ );
}
/**
* Constructor.
*/
- public RowVersion(int partitionId, long link, @Nullable HybridTimestamp
timestamp, long nextLink, @Nullable BinaryRow value) {
- this.partitionId = partitionId;
- link(link);
-
- this.timestamp = timestamp;
- this.nextLink = nextLink;
- this.valueSize = value == null ? 0 : value.tupleSliceLength();
- this.value = value;
+ public RowVersion(int partitionId, long link, @Nullable HybridTimestamp
commitTimestamp, long nextLink, int valueSize) {
+ this(partitionId, link, commitTimestamp, nextLink, valueSize, null);
}
/**
* Constructor.
*/
- public RowVersion(int partitionId, long link, @Nullable HybridTimestamp
timestamp, long nextLink, int valueSize) {
+ public RowVersion(
+ int partitionId,
+ long link,
+ @Nullable HybridTimestamp timestamp,
+ long nextLink,
+ int valueSize,
+ @Nullable BinaryRow value
+ ) {
this.partitionId = partitionId;
link(link);
this.timestamp = timestamp;
this.nextLink = nextLink;
this.valueSize = valueSize;
- this.value = null;
+ this.value = value;
}
public @Nullable HybridTimestamp timestamp() {
@@ -160,29 +180,27 @@ public final class RowVersion implements Storable {
}
@Override
- public void writeRowData(long pageAddr, int dataOff, int payloadSize,
boolean newRow) {
+ public final void writeRowData(long pageAddr, int dataOff, int
payloadSize, boolean newRow) {
PageUtils.putShort(pageAddr, dataOff, (short) payloadSize);
dataOff += Short.BYTES;
- PageUtils.putByte(pageAddr, dataOff + DATA_TYPE_OFFSET, DATA_TYPE);
-
- HybridTimestamps.writeTimestampToMemory(pageAddr, dataOff +
TIMESTAMP_OFFSET, timestamp());
-
- writePartitionless(pageAddr + dataOff + NEXT_LINK_OFFSET, nextLink());
-
- PageUtils.putInt(pageAddr, dataOff + VALUE_SIZE_OFFSET, valueSize());
+ writeHeader(pageAddr, dataOff);
if (value != null) {
- PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET,
(short) value.schemaVersion());
-
- PageUtils.putByteBuffer(pageAddr, dataOff + VALUE_OFFSET,
value.tupleSlice());
- } else {
- PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET,
(short) 0);
+ PageUtils.putByteBuffer(pageAddr, dataOff + valueOffset(),
value.tupleSlice());
}
}
+ protected byte dataType() {
+ return DATA_TYPE;
+ }
+
+ protected int valueOffset() {
+ return VALUE_OFFSET;
+ }
+
@Override
- public void writeFragmentData(ByteBuffer pageBuf, int rowOff, int
payloadSize) {
+ public final void writeFragmentData(ByteBuffer pageBuf, int rowOff, int
payloadSize) {
int headerSize = headerSize();
int bufferOffset;
@@ -193,15 +211,7 @@ public final class RowVersion implements Storable {
assert headerSize <= payloadSize : "Header must entirely fit in
the first fragment, but header size is "
+ headerSize + " and payload size is " + payloadSize;
- pageBuf.put(DATA_TYPE);
-
- HybridTimestamps.writeTimestampToBuffer(pageBuf, timestamp());
-
- PartitionlessLinks.writeToBuffer(pageBuf, nextLink());
-
- pageBuf.putInt(valueSize());
-
- pageBuf.putShort(value == null ? 0 : (short)
value.schemaVersion());
+ writeHeader(pageBuf);
bufferOffset = 0;
bufferSize = payloadSize - headerSize;
@@ -218,6 +228,31 @@ public final class RowVersion implements Storable {
}
}
+ protected void writeHeader(long pageAddr, int dataOff) {
+ PageUtils.putByte(pageAddr, dataOff + DATA_TYPE_OFFSET, dataType());
+ HybridTimestamps.writeTimestampToMemory(pageAddr, dataOff +
TIMESTAMP_OFFSET, timestamp());
+ writePartitionless(pageAddr + dataOff + NEXT_LINK_OFFSET, nextLink());
+ PageUtils.putInt(pageAddr, dataOff + VALUE_SIZE_OFFSET, valueSize());
+ PageUtils.putShort(pageAddr, dataOff + SCHEMA_VERSION_OFFSET,
schemaVersionOrZero());
+ }
+
+ protected void writeHeader(ByteBuffer pageBuf) {
+ pageBuf.put(dataType());
+ HybridTimestamps.writeTimestampToBuffer(pageBuf, timestamp());
+ PartitionlessLinks.writeToBuffer(pageBuf, nextLink());
+ pageBuf.putInt(valueSize());
+ pageBuf.putShort(schemaVersionOrZero());
+ }
+
+ private short schemaVersionOrZero() {
+ //noinspection NumericCastThatLosesPrecision
+ return value == null ? 0 : (short) value.schemaVersion();
+ }
+
+ static long readNextLink(int partitionId, long pageAddr, int offset) {
+ return readPartitionless(partitionId, pageAddr, offset +
NEXT_LINK_OFFSET);
+ }
+
@Override
public String toString() {
return S.toString(RowVersion.class, this);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
new file mode 100644
index 00000000000..7b723899b6a
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstracts out reading of {@link RowVersion}s of different formats.
+ *
+ * <p>Implementations are stateful and meant to be used for reading a single
row version.
+ */
+interface RowVersionReader {
+ static RowVersionReader newRowVersionReader(byte dataType, long link, int
partitionId) {
+ switch (dataType) {
+ case RowVersion.DATA_TYPE:
+ return new PlainRowVersionReader(link, partitionId);
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
dataType);
+ }
+ }
+
+ /**
+ * Reads row version data from the given page address and offset.
+ *
+ * <p>This is executed under a read lock over the corresponding page.
+ *
+ * @param pageAddr Page address.
+ * @param offset Row version offset within the page.
+ */
+ void readFromPage(long pageAddr, int offset);
+
+ /** Returns the timestamp of the row version that was read by {@link
#readFromPage(long, int)}. */
+ @Nullable
+ HybridTimestamp timestamp();
+
+ /** Returns the schema version of the row version that was read by {@link
#readFromPage(long, int)}. */
+ int schemaVersion();
+
+ /** Returns value size of the row version that was read by {@link
#readFromPage(long, int)}. */
+ int valueSize();
+
+ /**
+ * Creates a {@link RowVersion} instance using the data read by {@link
#readFromPage(long, int)}.
+ *
+ * @param valueSize Value size.
+ * @param value Binary value (can be {@code null} if not loaded).
+ * @return New row version.
+ */
+ RowVersion createRowVersion(int valueSize, @Nullable BinaryRow value);
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
similarity index 61%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
index 9d24b115b68..a8a2427e408 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionValueOffsets.java
@@ -17,26 +17,22 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
-
/**
- * Reads {@link RowVersion#value()} from page-memory.
+ * Provides offsets of value bytes of {@link RowVersion}s of different
versions.
+ *
+ * <p>Implementations are stateless and can be reused.
*/
-class ReadRowVersionValue extends ReadPageMemoryRowValue {
- /** {@inheritDoc} */
- @Override
- protected int valueSizeOffsetInFirstSlot() {
- return RowVersion.VALUE_SIZE_OFFSET;
+interface RowVersionValueOffsets {
+ static RowVersionValueOffsets offsetsFor(byte dataType) {
+ switch (dataType) {
+ case RowVersion.DATA_TYPE:
+ return PlainRowVersionValueOffsets.INSTANCE;
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
dataType);
+ }
}
- /** {@inheritDoc} */
- @Override
- protected int valueOffsetInFirstSlot() {
- return RowVersion.VALUE_OFFSET;
- }
+ int valueSizeOffsetInFirstSlot();
- @Override
- protected byte dataType() {
- return RowVersion.DATA_TYPE;
- }
+ int valueOffsetInFirstSlot();
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index bd9b80ad52b..a1985b93d10 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -210,7 +210,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
}
@Test
- void testShouldReleaseReturnsFalseWhenNoCheckpointWaiting() throws
Exception {
+ void testShouldReleaseReturnsFalseWhenNoCheckpointWaiting() {
AtomicBoolean shouldReleaseValue = new AtomicBoolean(false);
storage.runConsistently(locker -> {
@@ -223,7 +223,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
}
@Test
- void testNestedRunConsistentlyInheritsLocker() throws Exception {
+ void testNestedRunConsistentlyInheritsLocker() {
AtomicBoolean outerShouldRelease = new AtomicBoolean(false);
AtomicBoolean innerShouldRelease = new AtomicBoolean(false);
@@ -248,7 +248,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
}
@Test
- void testShouldReleaseReturnsTrueWhenWriterIsWaiting() throws Exception {
+ void testShouldReleaseReturnsTrueWhenWriterIsWaiting() {
AtomicBoolean shouldReleaseValue = new AtomicBoolean(false);
storage.runConsistently(locker -> {