This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba7b38de89 IGNITE-18736 Prepare class
AbstractPageMemoryMvPartitionStorage cursors for working with locks (#1645)
ba7b38de89 is described below
commit ba7b38de89153132faccc88cc9387a793e0e26bd
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Feb 8 11:40:35 2023 +0300
IGNITE-18736 Prepare class AbstractPageMemoryMvPartitionStorage cursors for
working with locks (#1645)
---
.../ignite/internal/pagememory/tree/BplusTree.java | 7 +-
.../ignite/internal/storage/StorageException.java | 13 +
.../mv/AbstractPageMemoryMvPartitionStorage.java | 284 +++------------------
.../mv/AbstractPartitionTimestampCursor.java | 192 ++++++++++++++
.../pagememory/mv/LatestVersionsCursor.java | 36 +++
.../mv/PersistentPageMemoryMvPartitionStorage.java | 5 +-
.../storage/pagememory/mv/ScanVersionsCursor.java | 106 ++++++++
.../storage/pagememory/mv/TimestampCursor.java | 44 ++++
.../mv/VolatilePageMemoryMvPartitionStorage.java | 5 +-
9 files changed, 432 insertions(+), 260 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index f92854d7ab..f0a915f41e 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1254,7 +1254,12 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @return Cursor.
* @throws IgniteInternalCheckedException If failed.
*/
- public Cursor<T> find(@Nullable L lower, @Nullable L upper,
TreeRowClosure<L, T> c, Object x) throws IgniteInternalCheckedException {
+ public Cursor<T> find(
+ @Nullable L lower,
+ @Nullable L upper,
+ TreeRowClosure<L, T> c,
+ @Nullable Object x
+ ) throws IgniteInternalCheckedException {
return find(lower, upper, true, true, c, x);
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index 71a4fdd250..0b2e6d2d3c 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage;
import org.apache.ignite.lang.ErrorGroups.Storage;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
/**
@@ -75,4 +76,16 @@ public class StorageException extends
IgniteInternalException {
protected StorageException(int code, String message, @Nullable Throwable
cause) {
super(code, message, cause);
}
+
+ /**
+ * Constructor.
+ *
+ * @param messagePattern Error message pattern.
+ * @param cause Non-null throwable cause.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public StorageException(String messagePattern, Throwable cause, Object...
params) {
+ this(IgniteStringFormatter.format(messagePattern, params), cause);
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index d90c6be71f..ca3560370e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -22,12 +22,10 @@ import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -67,6 +65,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTree;
import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -80,7 +79,7 @@ import org.jetbrains.annotations.Nullable;
public abstract class AbstractPageMemoryMvPartitionStorage implements
MvPartitionStorage {
private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
- private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE =
timestamp -> true;
+ static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp ->
true;
protected final int partitionId;
@@ -147,7 +146,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
*/
public void start() {
busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
NamedListView<TableIndexView> indexesCfgView =
tableStorage.tablesConfiguration().indexes().value();
@@ -202,7 +201,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta
indexMeta) {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
var indexDescriptor = new HashIndexDescriptor(indexMeta.id(),
tableStorage.tablesConfiguration().value());
@@ -254,7 +253,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
private PageMemorySortedIndexStorage createOrRestoreSortedIndex(IndexMeta
indexMeta) {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
var indexDescriptor = new SortedIndexDescriptor(indexMeta.id(),
tableStorage.tablesConfiguration().value());
@@ -309,7 +308,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
@@ -342,7 +341,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
}
- private ReadResult findLatestRowVersion(VersionChain versionChain) {
+ ReadResult findLatestRowVersion(VersionChain versionChain) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(),
ALWAYS_LOAD_VALUE);
if (versionChain.isUncommitted()) {
@@ -369,7 +368,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
try {
rowVersionDataPageReader.traverse(rowVersionLink, read, loadValue);
} catch (IgniteInternalCheckedException e) {
- throw new StorageException("Row version lookup failed", e);
+ throw new StorageException("Row version lookup failed: [link={},
{}]", e, rowVersionLink, createStorageInfo());
}
return read.result();
@@ -398,7 +397,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
* @param timestamp Timestamp.
* @return Read result.
*/
- private ReadResult findRowVersionByTimestamp(VersionChain versionChain,
HybridTimestamp timestamp) {
+ ReadResult findRowVersionByTimestamp(VersionChain versionChain,
HybridTimestamp timestamp) {
assert timestamp != null;
long headLink = versionChain.headLink();
@@ -572,7 +571,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
assert rowId.partitionId() == partitionId : rowId;
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
VersionChain currentVersionChain = findVersionChain(rowId);
@@ -704,13 +703,14 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
- return new ScanVersionsCursor(rowId);
+ // TODO: IGNITE-18717 Add lock by rowId
+ return new ScanVersionsCursor(rowId, this);
});
}
- private static ReadResult
rowVersionToResultNotFillingLastCommittedTs(VersionChain versionChain,
RowVersion rowVersion) {
+ static ReadResult rowVersionToResultNotFillingLastCommittedTs(VersionChain
versionChain, RowVersion rowVersion) {
TableRow row = new TableRow(rowVersion.value());
if (rowVersion.isCommitted()) {
@@ -730,20 +730,12 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException {
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
-
- Cursor<VersionChain> treeCursor;
-
- try {
- treeCursor = versionChainTree.find(null, null);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Find failed", e);
- }
+ throwExceptionIfStorageNotInRunnableState();
if (lookingForLatestVersion(timestamp)) {
- return new LatestVersionsCursor(treeCursor);
+ return new LatestVersionsCursor(this);
} else {
- return new TimestampCursor(treeCursor, timestamp);
+ return new TimestampCursor(this, timestamp);
}
});
}
@@ -751,7 +743,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws
StorageException {
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
try (Cursor<VersionChain> cursor = versionChainTree.find(new
VersionChainKey(lowerBound), null)) {
return cursor.hasNext() ? cursor.next().rowId() : null;
@@ -764,7 +756,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
@Override
public long rowsCount() {
return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
try {
return versionChainTree.size();
@@ -774,232 +766,6 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
});
}
- private abstract class BasePartitionTimestampCursor implements
PartitionTimestampCursor {
- protected final Cursor<VersionChain> treeCursor;
-
- @Nullable
- protected ReadResult nextRead = null;
-
- @Nullable
- protected VersionChain currentChain = null;
-
- protected BasePartitionTimestampCursor(Cursor<VersionChain>
treeCursor) {
- this.treeCursor = treeCursor;
- }
-
- @Override
- public final ReadResult next() {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
- if (!hasNext()) {
- throw new NoSuchElementException("The cursor is
exhausted");
- }
-
- assert nextRead != null;
-
- ReadResult res = nextRead;
-
- nextRead = null;
-
- return res;
- });
- }
-
- @Override
- public void close() {
- treeCursor.close();
- }
-
- @Override
- public @Nullable TableRow committed(HybridTimestamp timestamp) {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
- if (currentChain == null) {
- throw new IllegalStateException();
- }
-
- ReadResult result = findRowVersionByTimestamp(currentChain,
timestamp);
-
- if (result.isEmpty()) {
- return null;
- }
-
- // We don't check if row conforms the key filter here, because
we've already checked it.
- return result.tableRow();
- });
- }
- }
-
- /**
- * Implementation of the {@link PartitionTimestampCursor} over the page
memory storage. See {@link PartitionTimestampCursor} for the
- * details on the API.
- */
- private class TimestampCursor extends BasePartitionTimestampCursor {
- private final HybridTimestamp timestamp;
-
- private boolean iterationExhausted = false;
-
- public TimestampCursor(Cursor<VersionChain> treeCursor,
HybridTimestamp timestamp) {
- super(treeCursor);
-
- this.timestamp = timestamp;
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
- if (nextRead != null) {
- return true;
- }
-
- if (iterationExhausted) {
- return false;
- }
-
- currentChain = null;
-
- while (true) {
- if (!treeCursor.hasNext()) {
- iterationExhausted = true;
-
- return false;
- }
-
- VersionChain chain = treeCursor.next();
- ReadResult result = findRowVersionByTimestamp(chain,
timestamp);
-
- if (result.isEmpty() && !result.isWriteIntent()) {
- continue;
- }
-
- nextRead = result;
- currentChain = chain;
-
- return true;
- }
- });
- }
- }
-
- /**
- * Implementation of the cursor that iterates over the page memory storage
with the respect to the transaction id. Scans the partition
- * and returns a cursor of values. All filtered values must either be
uncommitted in the current transaction or already committed in a
- * different transaction.
- */
- private class LatestVersionsCursor extends BasePartitionTimestampCursor {
- private boolean iterationExhausted = false;
-
- public LatestVersionsCursor(Cursor<VersionChain> treeCursor) {
- super(treeCursor);
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
- if (nextRead != null) {
- return true;
- }
-
- if (iterationExhausted) {
- return false;
- }
-
- while (true) {
- if (!treeCursor.hasNext()) {
- iterationExhausted = true;
- return false;
- }
-
- VersionChain chain = treeCursor.next();
- ReadResult result = findLatestRowVersion(chain);
-
- if (result.isEmpty() && !result.isWriteIntent()) {
- continue;
- }
-
- nextRead = result;
- currentChain = chain;
-
- return true;
- }
- });
- }
- }
-
- private class ScanVersionsCursor implements Cursor<ReadResult> {
- final RowId rowId;
-
- @Nullable
- private Boolean hasNext;
-
- @Nullable
- private VersionChain versionChain;
-
- @Nullable
- private RowVersion rowVersion;
-
- private ScanVersionsCursor(RowId rowId) {
- this.rowId = rowId;
- }
-
- @Override
- public void close() {
- // No-op.
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
- advanceIfNeeded();
-
- return hasNext;
- });
- }
-
- @Override
- public ReadResult next() {
- return busy(() -> {
- advanceIfNeeded();
-
- if (!hasNext) {
- throw new NoSuchElementException();
- }
-
- hasNext = null;
-
- return
rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion);
- });
- }
-
- private void advanceIfNeeded() {
- throwExceptionIfStorageNotInRunnableState(state.get(),
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
- if (hasNext != null) {
- return;
- }
-
- if (versionChain == null) {
- try {
- versionChain = versionChainTree.findOne(new
VersionChainKey(rowId));
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(e);
- }
-
- rowVersion = versionChain == null ? null :
readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
- } else {
- rowVersion = !rowVersion.hasNextLink() ? null :
readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
- }
-
- hasNext = rowVersion != null;
- }
- }
-
/**
* Closes the partition in preparation for its destruction.
*/
@@ -1176,4 +942,16 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
sortedIndexes.values().forEach(PageMemorySortedIndexStorage::finishCleanup);
}
}
+
+ @Nullable VersionChain readVersionChain(RowId rowId) {
+ try {
+ return versionChainTree.findOne(new VersionChainKey(rowId));
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error getting version chain:
[rowId={}, {}]", e, rowId, createStorageInfo());
+ }
+ }
+
+ void throwExceptionIfStorageNotInRunnableState() {
+ StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
new file mode 100644
index 0000000000..0db19fd235
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+abstract class AbstractPartitionTimestampCursor implements
PartitionTimestampCursor {
+ protected final AbstractPageMemoryMvPartitionStorage storage;
+
+ private @Nullable Cursor<VersionChain> versionChainCursor;
+
+ /** {@link ReadResult} obtained by caching {@link VersionChain} with
{@link BplusTree#find(Object, Object, TreeRowClosure, Object)}. */
+ private final Map<RowId, ReadResult> readResultByRowId = new HashMap<>();
+
+ private boolean iterationExhausted;
+
+ private @Nullable ReadResult nextRead;
+
+ private @Nullable VersionChain currentChain;
+
+ AbstractPartitionTimestampCursor(AbstractPageMemoryMvPartitionStorage
storage) {
+ this.storage = storage;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return storage.busy(() -> {
+ storage.throwExceptionIfStorageNotInRunnableState();
+
+ if (nextRead != null) {
+ return true;
+ }
+
+ if (iterationExhausted) {
+ return false;
+ }
+
+ createVersionChainCursorIfMissing();
+
+ currentChain = null;
+
+ while (true) {
+ if (!versionChainCursor.hasNext()) {
+ iterationExhausted = true;
+
+ return false;
+ }
+
+ VersionChain chain = versionChainCursor.next();
+
+ ReadResult result = readResultByRowId.remove(chain.rowId());
+
+ if (result == null) {
+ // TODO: IGNITE-18717 Add lock by rowId
+ chain = storage.readVersionChain(chain.rowId());
+
+ if (chain == null) {
+ continue;
+ }
+
+ result = findRowVersion(chain);
+ }
+
+ if (result.isEmpty() && !result.isWriteIntent()) {
+ continue;
+ }
+
+ nextRead = result;
+ currentChain = chain;
+
+ return true;
+ }
+ });
+ }
+
+ @Override
+ public final ReadResult next() {
+ return storage.busy(() -> {
+ storage.throwExceptionIfStorageNotInRunnableState();
+
+ if (!hasNext()) {
+ throw new NoSuchElementException("The cursor is exhausted: " +
storage.createStorageInfo());
+ }
+
+ assert nextRead != null;
+
+ ReadResult res = nextRead;
+
+ nextRead = null;
+
+ return res;
+ });
+ }
+
+ @Override
+ public void close() {
+ if (versionChainCursor != null) {
+ versionChainCursor.close();
+ }
+ }
+
+ @Override
+ public @Nullable TableRow committed(HybridTimestamp timestamp) {
+ return storage.busy(() -> {
+ storage.throwExceptionIfStorageNotInRunnableState();
+
+ if (currentChain == null) {
+ throw new IllegalStateException("Version chain missing: " +
storage.createStorageInfo());
+ }
+
+ // TODO: IGNITE-18717 Add lock by rowId
+ VersionChain chain =
storage.readVersionChain(currentChain.rowId());
+
+ if (chain == null) {
+ return null;
+ }
+
+ ReadResult result = storage.findRowVersionByTimestamp(chain,
timestamp);
+
+ if (result.isEmpty()) {
+ return null;
+ }
+
+ // We don't check if row conforms the key filter here, because
we've already checked it.
+ return result.tableRow();
+ });
+ }
+
+ /**
+ * Finds a {@link RowVersion} in the {@link VersionChain}, depending on
the implementation.
+ *
+ * <p>For example, for a specific timestamp or the very last in the chain.
+ *
+ * @param versionChain Version chain.
+ */
+ abstract ReadResult findRowVersion(VersionChain versionChain);
+
+ private void createVersionChainCursorIfMissing() {
+ if (versionChainCursor != null) {
+ return;
+ }
+
+ try {
+ versionChainCursor = storage.versionChainTree.find(null, null,
(tree, io, pageAddr, idx) -> {
+ // Since the BplusTree cursor caches rows that are on the same
page, we should try to get actual ReadResult for them in this
+ // filter so as not to get into a situation when we read the
chain and the links in it are no longer valid.
+
+ VersionChain versionChain = tree.getRow(io, pageAddr, idx);
+
+ // TODO: IGNITE-18717 Perhaps add lock by rowId
+
+ ReadResult readResult = findRowVersion(versionChain);
+
+ if (!readResult.isEmpty()) {
+ readResultByRowId.put(versionChain.rowId(), readResult);
+ }
+
+ return true;
+ }, null);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Find failed: " +
storage.createStorageInfo(), e);
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
new file mode 100644
index 0000000000..6a44f32584
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.storage.ReadResult;
+
+/**
+ * Implementation of the cursor that iterates over the page memory storage
with the respect to the transaction id. Scans the partition
+ * and returns a cursor of values. All filtered values must either be
uncommitted in the current transaction or already committed in a
+ * different transaction.
+ */
+class LatestVersionsCursor extends AbstractPartitionTimestampCursor {
+ LatestVersionsCursor(AbstractPageMemoryMvPartitionStorage storage) {
+ super(storage);
+ }
+
+ @Override
+ ReadResult findRowVersion(VersionChain versionChain) {
+ return storage.findLatestRowVersion(versionChain);
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 2ef58c5553..cb697aac5c 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.util.List;
import java.util.UUID;
@@ -195,7 +194,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm)
throws StorageException {
busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
lastAppliedBusy(lastAppliedIndex, lastAppliedTerm);
@@ -253,7 +252,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
committedGroupConfigurationBusy(config);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
new file mode 100644
index 0000000000..9f581feae4
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowVersionToResultNotFillingLastCommittedTs;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor reading all versions for {@link RowId}.
+ *
+ * <p>In the constructor, reads all versions of the chain and then iterates
over them.
+ */
+class ScanVersionsCursor implements Cursor<ReadResult> {
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private final @Nullable VersionChain versionChain;
+
+ private final Iterator<RowVersion> rowVersionIterator;
+
+ /**
+ * Constructor.
+ *
+ * @param rowId Row ID.
+ * @param storage Multi-versioned partition storage.
+ * @throws StorageException If there is an error when collecting all
versions of the chain.
+ */
+ ScanVersionsCursor(
+ RowId rowId,
+ AbstractPageMemoryMvPartitionStorage storage
+ ) {
+ this.storage = storage;
+
+ versionChain = storage.readVersionChain(rowId);
+
+ rowVersionIterator = versionChain == null ? emptyIterator() :
collectRowVersions();
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ return storage.busy(() -> {
+ storage.throwExceptionIfStorageNotInRunnableState();
+
+ return rowVersionIterator.hasNext();
+ });
+ }
+
+ @Override
+ public ReadResult next() {
+ return storage.busy(() -> {
+ storage.throwExceptionIfStorageNotInRunnableState();
+
+ return rowVersionToResultNotFillingLastCommittedTs(versionChain,
rowVersionIterator.next());
+ });
+ }
+
+ private Iterator<RowVersion> collectRowVersions() {
+ long link = versionChain.headLink();
+
+ List<RowVersion> rowVersions = new ArrayList<>();
+
+ while (link != NULL_LINK) {
+ RowVersion rowVersion = storage.readRowVersion(link,
ALWAYS_LOAD_VALUE);
+
+ if (rowVersion == null) {
+ link = NULL_LINK;
+ } else {
+ rowVersions.add(rowVersion);
+
+ link = rowVersion.nextLink();
+ }
+ }
+
+ return rowVersions.iterator();
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
new file mode 100644
index 0000000000..826f8f0a15
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+
+/**
+ * Implementation of the {@link PartitionTimestampCursor} over the page memory
storage. See {@link PartitionTimestampCursor} for the
+ * details on the API.
+ */
+class TimestampCursor extends AbstractPartitionTimestampCursor {
+ private final HybridTimestamp timestamp;
+
+ TimestampCursor(
+ AbstractPageMemoryMvPartitionStorage storage,
+ HybridTimestamp timestamp
+ ) {
+ super(storage);
+
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ ReadResult findRowVersion(VersionChain versionChain) {
+ return storage.findRowVersionByTimestamp(versionChain, timestamp);
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index cc5348e7f9..1d005b4ed9 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -21,7 +21,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -131,7 +130,7 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm)
throws StorageException {
busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
@@ -161,7 +160,7 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
- throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState();
groupConfig = config;