This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba7b38de89 IGNITE-18736 Prepare class 
AbstractPageMemoryMvPartitionStorage cursors for working with locks (#1645)
ba7b38de89 is described below

commit ba7b38de89153132faccc88cc9387a793e0e26bd
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Feb 8 11:40:35 2023 +0300

    IGNITE-18736 Prepare class AbstractPageMemoryMvPartitionStorage cursors for 
working with locks (#1645)
---
 .../ignite/internal/pagememory/tree/BplusTree.java |   7 +-
 .../ignite/internal/storage/StorageException.java  |  13 +
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 284 +++------------------
 .../mv/AbstractPartitionTimestampCursor.java       | 192 ++++++++++++++
 .../pagememory/mv/LatestVersionsCursor.java        |  36 +++
 .../mv/PersistentPageMemoryMvPartitionStorage.java |   5 +-
 .../storage/pagememory/mv/ScanVersionsCursor.java  | 106 ++++++++
 .../storage/pagememory/mv/TimestampCursor.java     |  44 ++++
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |   5 +-
 9 files changed, 432 insertions(+), 260 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index f92854d7ab..f0a915f41e 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1254,7 +1254,12 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
      * @return Cursor.
      * @throws IgniteInternalCheckedException If failed.
      */
-    public Cursor<T> find(@Nullable L lower, @Nullable L upper, 
TreeRowClosure<L, T> c, Object x) throws IgniteInternalCheckedException {
+    public Cursor<T> find(
+            @Nullable L lower,
+            @Nullable L upper,
+            TreeRowClosure<L, T> c,
+            @Nullable Object x
+    ) throws IgniteInternalCheckedException {
         return find(lower, upper, true, true, c, x);
     }
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index 71a4fdd250..0b2e6d2d3c 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage;
 
 import org.apache.ignite.lang.ErrorGroups.Storage;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -75,4 +76,16 @@ public class StorageException extends 
IgniteInternalException {
     protected StorageException(int code, String message, @Nullable Throwable 
cause) {
         super(code, message, cause);
     }
+
+    /**
+     * Constructor.
+     *
+     * @param messagePattern Error message pattern.
+     * @param cause Non-null throwable cause.
+     * @param params Error message params.
+     * @see IgniteStringFormatter#format(String, Object...)
+     */
+    public StorageException(String messagePattern, Throwable cause, Object... 
params) {
+        this(IgniteStringFormatter.format(messagePattern, params), cause);
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index d90c6be71f..ca3560370e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -22,12 +22,10 @@ import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -67,6 +65,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import 
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
 import 
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTree;
 import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.storage.util.StorageUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -80,7 +79,7 @@ import org.jetbrains.annotations.Nullable;
 public abstract class AbstractPageMemoryMvPartitionStorage implements 
MvPartitionStorage {
     private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
 
-    private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = 
timestamp -> true;
+    static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> 
true;
 
     protected final int partitionId;
 
@@ -147,7 +146,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
      */
     public void start() {
         busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
                 NamedListView<TableIndexView> indexesCfgView = 
tableStorage.tablesConfiguration().indexes().value();
@@ -202,7 +201,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     }
 
     private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta 
indexMeta) {
-        throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+        throwExceptionIfStorageNotInRunnableState();
 
         var indexDescriptor = new HashIndexDescriptor(indexMeta.id(), 
tableStorage.tablesConfiguration().value());
 
@@ -254,7 +253,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     }
 
     private PageMemorySortedIndexStorage createOrRestoreSortedIndex(IndexMeta 
indexMeta) {
-        throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+        throwExceptionIfStorageNotInRunnableState();
 
         var indexDescriptor = new SortedIndexDescriptor(indexMeta.id(), 
tableStorage.tablesConfiguration().value());
 
@@ -309,7 +308,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             if (rowId.partitionId() != partitionId) {
                 throw new IllegalArgumentException(
@@ -342,7 +341,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         }
     }
 
-    private ReadResult findLatestRowVersion(VersionChain versionChain) {
+    ReadResult findLatestRowVersion(VersionChain versionChain) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), 
ALWAYS_LOAD_VALUE);
 
         if (versionChain.isUncommitted()) {
@@ -369,7 +368,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         try {
             rowVersionDataPageReader.traverse(rowVersionLink, read, loadValue);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Row version lookup failed", e);
+            throw new StorageException("Row version lookup failed: [link={}, 
{}]", e, rowVersionLink, createStorageInfo());
         }
 
         return read.result();
@@ -398,7 +397,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
      * @param timestamp Timestamp.
      * @return Read result.
      */
-    private ReadResult findRowVersionByTimestamp(VersionChain versionChain, 
HybridTimestamp timestamp) {
+    ReadResult findRowVersionByTimestamp(VersionChain versionChain, 
HybridTimestamp timestamp) {
         assert timestamp != null;
 
         long headLink = versionChain.headLink();
@@ -572,7 +571,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         assert rowId.partitionId() == partitionId : rowId;
 
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             VersionChain currentVersionChain = findVersionChain(rowId);
 
@@ -704,13 +703,14 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
-            return new ScanVersionsCursor(rowId);
+            // TODO: IGNITE-18717 Add lock by rowId
+            return new ScanVersionsCursor(rowId, this);
         });
     }
 
-    private static ReadResult 
rowVersionToResultNotFillingLastCommittedTs(VersionChain versionChain, 
RowVersion rowVersion) {
+    static ReadResult rowVersionToResultNotFillingLastCommittedTs(VersionChain 
versionChain, RowVersion rowVersion) {
         TableRow row = new TableRow(rowVersion.value());
 
         if (rowVersion.isCommitted()) {
@@ -730,20 +730,12 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException {
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
-
-            Cursor<VersionChain> treeCursor;
-
-            try {
-                treeCursor = versionChainTree.find(null, null);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Find failed", e);
-            }
+            throwExceptionIfStorageNotInRunnableState();
 
             if (lookingForLatestVersion(timestamp)) {
-                return new LatestVersionsCursor(treeCursor);
+                return new LatestVersionsCursor(this);
             } else {
-                return new TimestampCursor(treeCursor, timestamp);
+                return new TimestampCursor(this, timestamp);
             }
         });
     }
@@ -751,7 +743,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws 
StorageException {
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             try (Cursor<VersionChain> cursor = versionChainTree.find(new 
VersionChainKey(lowerBound), null)) {
                 return cursor.hasNext() ? cursor.next().rowId() : null;
@@ -764,7 +756,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     @Override
     public long rowsCount() {
         return busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             try {
                 return versionChainTree.size();
@@ -774,232 +766,6 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         });
     }
 
-    private abstract class BasePartitionTimestampCursor implements 
PartitionTimestampCursor {
-        protected final Cursor<VersionChain> treeCursor;
-
-        @Nullable
-        protected ReadResult nextRead = null;
-
-        @Nullable
-        protected VersionChain currentChain = null;
-
-        protected BasePartitionTimestampCursor(Cursor<VersionChain> 
treeCursor) {
-            this.treeCursor = treeCursor;
-        }
-
-        @Override
-        public final ReadResult next() {
-            return busy(() -> {
-                throwExceptionIfStorageNotInRunnableState(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
-                if (!hasNext()) {
-                    throw new NoSuchElementException("The cursor is 
exhausted");
-                }
-
-                assert nextRead != null;
-
-                ReadResult res = nextRead;
-
-                nextRead = null;
-
-                return res;
-            });
-        }
-
-        @Override
-        public void close() {
-            treeCursor.close();
-        }
-
-        @Override
-        public @Nullable TableRow committed(HybridTimestamp timestamp) {
-            return busy(() -> {
-                throwExceptionIfStorageNotInRunnableState(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
-                if (currentChain == null) {
-                    throw new IllegalStateException();
-                }
-
-                ReadResult result = findRowVersionByTimestamp(currentChain, 
timestamp);
-
-                if (result.isEmpty()) {
-                    return null;
-                }
-
-                // We don't check if row conforms the key filter here, because 
we've already checked it.
-                return result.tableRow();
-            });
-        }
-    }
-
-    /**
-     * Implementation of the {@link PartitionTimestampCursor} over the page 
memory storage. See {@link PartitionTimestampCursor} for the
-     * details on the API.
-     */
-    private class TimestampCursor extends BasePartitionTimestampCursor {
-        private final HybridTimestamp timestamp;
-
-        private boolean iterationExhausted = false;
-
-        public TimestampCursor(Cursor<VersionChain> treeCursor, 
HybridTimestamp timestamp) {
-            super(treeCursor);
-
-            this.timestamp = timestamp;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return busy(() -> {
-                throwExceptionIfStorageNotInRunnableState(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
-                if (nextRead != null) {
-                    return true;
-                }
-
-                if (iterationExhausted) {
-                    return false;
-                }
-
-                currentChain = null;
-
-                while (true) {
-                    if (!treeCursor.hasNext()) {
-                        iterationExhausted = true;
-
-                        return false;
-                    }
-
-                    VersionChain chain = treeCursor.next();
-                    ReadResult result = findRowVersionByTimestamp(chain, 
timestamp);
-
-                    if (result.isEmpty() && !result.isWriteIntent()) {
-                        continue;
-                    }
-
-                    nextRead = result;
-                    currentChain = chain;
-
-                    return true;
-                }
-            });
-        }
-    }
-
-    /**
-     * Implementation of the cursor that iterates over the page memory storage 
with the respect to the transaction id. Scans the partition
-     * and returns a cursor of values. All filtered values must either be 
uncommitted in the current transaction or already committed in a
-     * different transaction.
-     */
-    private class LatestVersionsCursor extends BasePartitionTimestampCursor {
-        private boolean iterationExhausted = false;
-
-        public LatestVersionsCursor(Cursor<VersionChain> treeCursor) {
-            super(treeCursor);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return busy(() -> {
-                throwExceptionIfStorageNotInRunnableState(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
-                if (nextRead != null) {
-                    return true;
-                }
-
-                if (iterationExhausted) {
-                    return false;
-                }
-
-                while (true) {
-                    if (!treeCursor.hasNext()) {
-                        iterationExhausted = true;
-                        return false;
-                    }
-
-                    VersionChain chain = treeCursor.next();
-                    ReadResult result = findLatestRowVersion(chain);
-
-                    if (result.isEmpty() && !result.isWriteIntent()) {
-                        continue;
-                    }
-
-                    nextRead = result;
-                    currentChain = chain;
-
-                    return true;
-                }
-            });
-        }
-    }
-
-    private class ScanVersionsCursor implements Cursor<ReadResult> {
-        final RowId rowId;
-
-        @Nullable
-        private Boolean hasNext;
-
-        @Nullable
-        private VersionChain versionChain;
-
-        @Nullable
-        private RowVersion rowVersion;
-
-        private ScanVersionsCursor(RowId rowId) {
-            this.rowId = rowId;
-        }
-
-        @Override
-        public void close() {
-            // No-op.
-        }
-
-        @Override
-        public boolean hasNext() {
-            return busy(() -> {
-                advanceIfNeeded();
-
-                return hasNext;
-            });
-        }
-
-        @Override
-        public ReadResult next() {
-            return busy(() -> {
-                advanceIfNeeded();
-
-                if (!hasNext) {
-                    throw new NoSuchElementException();
-                }
-
-                hasNext = null;
-
-                return 
rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion);
-            });
-        }
-
-        private void advanceIfNeeded() {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
-            if (hasNext != null) {
-                return;
-            }
-
-            if (versionChain == null) {
-                try {
-                    versionChain = versionChainTree.findOne(new 
VersionChainKey(rowId));
-                } catch (IgniteInternalCheckedException e) {
-                    throw new StorageException(e);
-                }
-
-                rowVersion = versionChain == null ? null : 
readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
-            } else {
-                rowVersion = !rowVersion.hasNextLink() ? null : 
readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
-            }
-
-            hasNext = rowVersion != null;
-        }
-    }
-
     /**
      * Closes the partition in preparation for its destruction.
      */
@@ -1176,4 +942,16 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
             
sortedIndexes.values().forEach(PageMemorySortedIndexStorage::finishCleanup);
         }
     }
+
+    @Nullable VersionChain readVersionChain(RowId rowId) {
+        try {
+            return versionChainTree.findOne(new VersionChainKey(rowId));
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error getting version chain: 
[rowId={}, {}]", e, rowId, createStorageInfo());
+        }
+    }
+
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
new file mode 100644
index 0000000000..0db19fd235
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+abstract class AbstractPartitionTimestampCursor implements 
PartitionTimestampCursor {
+    protected final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable Cursor<VersionChain> versionChainCursor;
+
+    /** {@link ReadResult} obtained by caching {@link VersionChain} with 
{@link BplusTree#find(Object, Object, TreeRowClosure, Object)}. */
+    private final Map<RowId, ReadResult> readResultByRowId = new HashMap<>();
+
+    private boolean iterationExhausted;
+
+    private @Nullable ReadResult nextRead;
+
+    private @Nullable VersionChain currentChain;
+
+    AbstractPartitionTimestampCursor(AbstractPageMemoryMvPartitionStorage 
storage) {
+        this.storage = storage;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return storage.busy(() -> {
+            storage.throwExceptionIfStorageNotInRunnableState();
+
+            if (nextRead != null) {
+                return true;
+            }
+
+            if (iterationExhausted) {
+                return false;
+            }
+
+            createVersionChainCursorIfMissing();
+
+            currentChain = null;
+
+            while (true) {
+                if (!versionChainCursor.hasNext()) {
+                    iterationExhausted = true;
+
+                    return false;
+                }
+
+                VersionChain chain = versionChainCursor.next();
+
+                ReadResult result = readResultByRowId.remove(chain.rowId());
+
+                if (result == null) {
+                    // TODO: IGNITE-18717 Add lock by rowId
+                    chain = storage.readVersionChain(chain.rowId());
+
+                    if (chain == null) {
+                        continue;
+                    }
+
+                    result = findRowVersion(chain);
+                }
+
+                if (result.isEmpty() && !result.isWriteIntent()) {
+                    continue;
+                }
+
+                nextRead = result;
+                currentChain = chain;
+
+                return true;
+            }
+        });
+    }
+
+    @Override
+    public final ReadResult next() {
+        return storage.busy(() -> {
+            storage.throwExceptionIfStorageNotInRunnableState();
+
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted: " + 
storage.createStorageInfo());
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+
+            nextRead = null;
+
+            return res;
+        });
+    }
+
+    @Override
+    public void close() {
+        if (versionChainCursor != null) {
+            versionChainCursor.close();
+        }
+    }
+
+    @Override
+    public @Nullable TableRow committed(HybridTimestamp timestamp) {
+        return storage.busy(() -> {
+            storage.throwExceptionIfStorageNotInRunnableState();
+
+            if (currentChain == null) {
+                throw new IllegalStateException("Version chain missing: " + 
storage.createStorageInfo());
+            }
+
+            // TODO: IGNITE-18717 Add lock by rowId
+            VersionChain chain = 
storage.readVersionChain(currentChain.rowId());
+
+            if (chain == null) {
+                return null;
+            }
+
+            ReadResult result = storage.findRowVersionByTimestamp(chain, 
timestamp);
+
+            if (result.isEmpty()) {
+                return null;
+            }
+
+            // We don't check if row conforms the key filter here, because 
we've already checked it.
+            return result.tableRow();
+        });
+    }
+
+    /**
+     * Finds a {@link RowVersion} in the {@link VersionChain}, depending on 
the implementation.
+     *
+     * <p>For example, for a specific timestamp or the very last in the chain.
+     *
+     * @param versionChain Version chain.
+     */
+    abstract ReadResult findRowVersion(VersionChain versionChain);
+
+    private void createVersionChainCursorIfMissing() {
+        if (versionChainCursor != null) {
+            return;
+        }
+
+        try {
+            versionChainCursor = storage.versionChainTree.find(null, null, 
(tree, io, pageAddr, idx) -> {
+                // Since the BplusTree cursor caches rows that are on the same 
page, we should try to get actual ReadResult for them in this
+                // filter so as not to get into a situation when we read the 
chain and the links in it are no longer valid.
+
+                VersionChain versionChain = tree.getRow(io, pageAddr, idx);
+
+                // TODO: IGNITE-18717 Perhaps add lock by rowId
+
+                ReadResult readResult = findRowVersion(versionChain);
+
+                if (!readResult.isEmpty()) {
+                    readResultByRowId.put(versionChain.rowId(), readResult);
+                }
+
+                return true;
+            }, null);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Find failed: " + 
storage.createStorageInfo(), e);
+        }
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
new file mode 100644
index 0000000000..6a44f32584
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LatestVersionsCursor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.storage.ReadResult;
+
+/**
+ * Implementation of the cursor that iterates over the page memory storage 
with the respect to the transaction id. Scans the partition
+ * and returns a cursor of values. All filtered values must either be 
uncommitted in the current transaction or already committed in a
+ * different transaction.
+ */
+class LatestVersionsCursor extends AbstractPartitionTimestampCursor {
+    LatestVersionsCursor(AbstractPageMemoryMvPartitionStorage storage) {
+        super(storage);
+    }
+
+    @Override
+    ReadResult findRowVersion(VersionChain versionChain) {
+        return storage.findLatestRowVersion(versionChain);
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 2ef58c5553..cb697aac5c 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.util.List;
 import java.util.UUID;
@@ -195,7 +194,7 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) 
throws StorageException {
         busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             lastAppliedBusy(lastAppliedIndex, lastAppliedTerm);
 
@@ -253,7 +252,7 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
         busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             committedGroupConfigurationBusy(config);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
new file mode 100644
index 0000000000..9f581feae4
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowVersionToResultNotFillingLastCommittedTs;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor reading all versions for {@link RowId}.
+ *
+ * <p>In the constructor, reads all versions of the chain and then iterates 
over them.
+ */
+class ScanVersionsCursor implements Cursor<ReadResult> {
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private final @Nullable VersionChain versionChain;
+
+    private final Iterator<RowVersion> rowVersionIterator;
+
+    /**
+     * Constructor.
+     *
+     * @param rowId Row ID.
+     * @param storage Multi-versioned partition storage.
+     * @throws StorageException If there is an error when collecting all 
versions of the chain.
+     */
+    ScanVersionsCursor(
+            RowId rowId,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.storage = storage;
+
+        versionChain = storage.readVersionChain(rowId);
+
+        rowVersionIterator = versionChain == null ? emptyIterator() : 
collectRowVersions();
+    }
+
+    @Override
+    public void close() {
+        // No-op.
+    }
+
+    @Override
+    public boolean hasNext() {
+        return storage.busy(() -> {
+            storage.throwExceptionIfStorageNotInRunnableState();
+
+            return rowVersionIterator.hasNext();
+        });
+    }
+
+    @Override
+    public ReadResult next() {
+        return storage.busy(() -> {
+            storage.throwExceptionIfStorageNotInRunnableState();
+
+            return rowVersionToResultNotFillingLastCommittedTs(versionChain, 
rowVersionIterator.next());
+        });
+    }
+
+    private Iterator<RowVersion> collectRowVersions() {
+        long link = versionChain.headLink();
+
+        List<RowVersion> rowVersions = new ArrayList<>();
+
+        while (link != NULL_LINK) {
+            RowVersion rowVersion = storage.readRowVersion(link, 
ALWAYS_LOAD_VALUE);
+
+            if (rowVersion == null) {
+                link = NULL_LINK;
+            } else {
+                rowVersions.add(rowVersion);
+
+                link = rowVersion.nextLink();
+            }
+        }
+
+        return rowVersions.iterator();
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
new file mode 100644
index 0000000000..826f8f0a15
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TimestampCursor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+
+/**
+ * Implementation of the {@link PartitionTimestampCursor} over the page memory 
storage. See {@link PartitionTimestampCursor} for the
+ * details on the API.
+ */
+class TimestampCursor extends AbstractPartitionTimestampCursor {
+    private final HybridTimestamp timestamp;
+
+    TimestampCursor(
+            AbstractPageMemoryMvPartitionStorage storage,
+            HybridTimestamp timestamp
+    ) {
+        super(storage);
+
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    ReadResult findRowVersion(VersionChain versionChain) {
+        return storage.findRowVersionByTimestamp(versionChain, timestamp);
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index cc5348e7f9..1d005b4ed9 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -21,7 +21,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
-import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -131,7 +130,7 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) 
throws StorageException {
         busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             this.lastAppliedIndex = lastAppliedIndex;
             this.lastAppliedTerm = lastAppliedTerm;
@@ -161,7 +160,7 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
         busy(() -> {
-            throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState();
 
             groupConfig = config;
 


Reply via email to