This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cd32ecc99f IGNITE-17755 Add a common interface for all index storages 
(#1127)
cd32ecc99f is described below

commit cd32ecc99fa038176ba4b7b64f0d9d2c50b9a491
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Sep 29 13:36:19 2022 +0300

    IGNITE-17755 Add a common interface for all index storages (#1127)
---
 .../internal/binarytuple/BinaryTupleParser.java    |   6 +-
 .../apache/ignite/internal/schema/BinaryTuple.java |   9 ++
 .../ignite/internal/schema/BinaryTuplePrefix.java  |  24 +++++
 .../internal/storage/engine/MvTableStorage.java    |  51 +++++++++--
 .../internal/storage/index/HashIndexStorage.java   |  33 +------
 .../{HashIndexStorage.java => IndexStorage.java}   |  31 ++-----
 .../internal/storage/index/SortedIndexStorage.java |  14 +--
 .../storage/AbstractMvTableStorageTest.java        |  21 ++++-
 .../chm/TestConcurrentHashMapMvTableStorage.java   |   5 +
 .../index/AbstractSortedIndexStorageTest.java      |  72 +++++++++++----
 .../internal/storage/index/impl/TestIndexRow.java  |  20 ----
 .../storage/index/impl/TestSortedIndexStorage.java |   8 ++
 .../pagememory/AbstractPageMemoryTableStorage.java |  23 ++++-
 .../index/hash/PageMemoryHashIndexStorage.java     |   4 +-
 .../index/sorted/PageMemorySortedIndexStorage.java |  20 +++-
 .../storage/pagememory/util/TreeCursorAdapter.java |   8 +-
 .../PersistentPageMemoryMvTableStorageTest.java    | 101 +++++++++++++++++++++
 .../VolatilePageMemoryMvTableStorageTest.java      |  95 +++++++++++++++++++
 .../storage/rocksdb/RocksDbTableStorage.java       |   5 +
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  71 ++++++++++-----
 .../state/rocksdb/TxStateRocksDbStorage.java       |   5 +-
 21 files changed, 473 insertions(+), 153 deletions(-)

diff --git 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
index 7665977c59..b9105b0204 100644
--- 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
+++ 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
@@ -46,12 +46,12 @@ public class BinaryTupleParser {
         void nextElement(int index, int begin, int end);
     }
 
+    /** Byte order of ByteBuffers that contain the tuple. */
+    public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
     /** UUID size in bytes. */
     private static final int UUID_SIZE = 16;
 
-    /** Byte order of ByteBuffers that contain the tuple. */
-    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
-
     /** Number of elements in the tuple. */
     private final int numElements;
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
index d5b4f7d743..1e0898a0d6 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
@@ -73,4 +73,13 @@ public class BinaryTuple extends BinaryTupleReader 
implements InternalTuple {
     public Object value(int index) {
         return schema.element(index).typeSpec.objectValue(this, index);
     }
+
+    /**
+     * Returns the schema of this tuple.
+     *
+     * @return This tuple's schema.
+     */
+    public BinaryTupleSchema schema() {
+        return schema;
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
index d7ef4817f5..a161115dec 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.schema;
 
+import static 
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.PREFIX_FLAG;
+
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
@@ -54,6 +56,28 @@ public class BinaryTuplePrefix extends BinaryTupleReader 
implements InternalTupl
         this.schema = schema;
     }
 
+    /**
+     * Creates a prefix that contains all columns from the provided {@link 
BinaryTuple}.
+     *
+     * @param tuple Tuple to create a prefix from.
+     * @return Prefix, equivalent to the tuple.
+     */
+    public static BinaryTuplePrefix fromBinaryTuple(BinaryTuple tuple) {
+        ByteBuffer tupleBuffer = tuple.byteBuffer();
+
+        ByteBuffer prefixBuffer = ByteBuffer.allocate(tupleBuffer.remaining() 
+ Integer.BYTES)
+                .order(ORDER)
+                .put(tupleBuffer)
+                .putInt(tuple.count())
+                .flip();
+
+        byte flags = prefixBuffer.get(0);
+
+        prefixBuffer.put(0, (byte) (flags | PREFIX_FLAG));
+
+        return new BinaryTuplePrefix(tuple.schema(), prefixBuffer);
+    }
+
     @Override
     public int count() {
         return elementCount();
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index cbd83726ee..701a321948 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -17,12 +17,19 @@
 
 package org.apache.ignite.internal.storage.engine;
 
+import static 
org.apache.ignite.configuration.schemas.table.TableIndexConfigurationSchema.HASH_INDEX_TYPE;
+import static 
org.apache.ignite.configuration.schemas.table.TableIndexConfigurationSchema.SORTED_INDEX_TYPE;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.jetbrains.annotations.Nullable;
 
@@ -31,11 +38,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface MvTableStorage {
     /**
-     * Retrieves or creates a partition for the current table. Not expected to 
be called concurrently with the same partition id.
+     * Retrieves or creates a partition for the current table. Not expected to 
be called concurrently with the same Partition ID.
      *
-     * @param partitionId Partition id.
+     * @param partitionId Partition ID.
      * @return Partition storage.
-     * @throws IllegalArgumentException If partition id is out of configured 
bounds.
+     * @throws IllegalArgumentException If Partition ID is out of configured 
bounds.
      * @throws StorageException If an error has occurred during the partition 
creation.
      */
     MvPartitionStorage getOrCreateMvPartition(int partitionId) throws 
StorageException;
@@ -43,21 +50,46 @@ public interface MvTableStorage {
     /**
      * Returns the partition storage or {@code null} if the requested storage 
doesn't exist.
      *
-     * @param partitionId Partition id.
+     * @param partitionId Partition ID.
      * @return Partition storage or {@code null} if it does not exist.
-     * @throws IllegalArgumentException If partition id is out of configured 
bounds.
+     * @throws IllegalArgumentException If Partition ID is out of configured 
bounds.
      */
     @Nullable MvPartitionStorage getMvPartition(int partitionId);
 
     /**
      * Destroys a partition and all associated indices.
      *
-     * @param partitionId Partition id.
-     * @throws IllegalArgumentException If partition id is out of bounds.
+     * @param partitionId Partition ID.
+     * @throws IllegalArgumentException If Partition ID is out of bounds.
      * @throws StorageException If an error has occurred during the partition 
destruction.
      */
     CompletableFuture<Void> destroyPartition(int partitionId) throws 
StorageException;
 
+    /**
+     * Returns an already created Index (either Sorted or Hash) with the given 
name or creates a new one if it does not exist.
+     *
+     * @param partitionId Partition ID.
+     * @param indexId Index ID.
+     * @return Index Storage.
+     * @throws StorageException If the given partition does not exist, or if 
the given index does not exist.
+     */
+    default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
+        TableIndexConfiguration indexConfig = 
ConfigurationUtil.getByInternalId(tablesConfiguration().indexes(), indexId);
+
+        if (indexConfig == null) {
+            throw new StorageException(String.format("Index configuration for 
\"%s\" could not be found", indexId));
+        }
+
+        switch (indexConfig.type().value()) {
+            case HASH_INDEX_TYPE:
+                return getOrCreateHashIndex(partitionId, indexId);
+            case SORTED_INDEX_TYPE:
+                return getOrCreateSortedIndex(partitionId, indexId);
+            default:
+                throw new StorageException("Unknown index type: " + 
indexConfig.type().value());
+        }
+    }
+
     /**
      * Returns an already created Sorted Index with the given name or creates 
a new one if it does not exist.
      *
@@ -105,6 +137,11 @@ public interface MvTableStorage {
      */
     TableConfiguration configuration();
 
+    /**
+     * Returns configuration for all tables and indices.
+     */
+    TablesConfiguration tablesConfiguration();
+
     /**
      * Starts the storage.
      *
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
index e6f531fc98..60ed7f9e05 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
@@ -18,11 +18,9 @@
 package org.apache.ignite.internal.storage.index;
 
 import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.util.Cursor;
 
 /**
  * Storage for a Hash Index.
@@ -30,41 +28,12 @@ import org.apache.ignite.internal.util.Cursor;
  * <p>This storage serves as an unordered mapping from a subset of a table's 
columns (a.k.a. index columns) to a set of {@link RowId}s
  * from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage} 
from the same table.
  */
-public interface HashIndexStorage {
+public interface HashIndexStorage extends IndexStorage {
     /**
      * Returns the Index Descriptor of this storage.
      */
     HashIndexDescriptor indexDescriptor();
 
-    /**
-     * Returns a cursor over {@code RowId}s associated with the given index 
key.
-     *
-     * @throws StorageException If failed to read data.
-     */
-    Cursor<RowId> get(BinaryTuple key) throws StorageException;
-
-    /**
-     * Adds the given index row to the index.
-     *
-     * <p>Usage note: this method <b>must</b> always be called inside the 
corresponding partition's
-     * {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
-     *
-     * @throws StorageException If failed to put data.
-     */
-    void put(IndexRow row) throws StorageException;
-
-    /**
-     * Removes the given row from the index.
-     *
-     * <p>Removing a non-existent row is a no-op.
-     *
-     * <p>Usage note: this method <b>must</b> always be called inside the 
corresponding partition's
-     * {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
-     *
-     * @throws StorageException If failed to remove data.
-     */
-    void remove(IndexRow row) throws StorageException;
-
     /**
      * Removes all data from this index.
      *
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
similarity index 58%
copy from 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
copy to 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
index e6f531fc98..0addff3490 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
@@ -17,25 +17,15 @@
 
 package org.apache.ignite.internal.storage.index;
 
-import java.util.UUID;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.util.Cursor;
 
 /**
- * Storage for a Hash Index.
- *
- * <p>This storage serves as an unordered mapping from a subset of a table's 
columns (a.k.a. index columns) to a set of {@link RowId}s
- * from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage} 
from the same table.
+ * Common interface for all Index Storage implementations.
  */
-public interface HashIndexStorage {
-    /**
-     * Returns the Index Descriptor of this storage.
-     */
-    HashIndexDescriptor indexDescriptor();
-
+public interface IndexStorage {
     /**
      * Returns a cursor over {@code RowId}s associated with the given index 
key.
      *
@@ -46,8 +36,8 @@ public interface HashIndexStorage {
     /**
      * Adds the given index row to the index.
      *
-     * <p>Usage note: this method <b>must</b> always be called inside the 
corresponding partition's
-     * {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
+     * @apiNote This method <b>must</b> always be called inside the 
corresponding partition's
+     *     {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
      *
      * @throws StorageException If failed to put data.
      */
@@ -58,19 +48,10 @@ public interface HashIndexStorage {
      *
      * <p>Removing a non-existent row is a no-op.
      *
-     * <p>Usage note: this method <b>must</b> always be called inside the 
corresponding partition's
-     * {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
+     * @apiNote This method <b>must</b> always be called inside the 
corresponding partition's
+     *     {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
      *
      * @throws StorageException If failed to remove data.
      */
     void remove(IndexRow row) throws StorageException;
-
-    /**
-     * Removes all data from this index.
-     *
-     * @throws StorageException If failed to destory index.
-     * @deprecated IGNITE-17626 Synchronous API should be removed. {@link 
MvTableStorage#destroyIndex(UUID)} must be the only public option.
-     */
-    @Deprecated
-    void destroy() throws StorageException;
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 79ab7a4d89..2aa1542537 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
  * <p>This storage serves as a sorted mapping from a subset of a table's 
columns (a.k.a. index columns) to a set of {@link RowId}s
  * from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage} 
from the same table.
  */
-public interface SortedIndexStorage {
+public interface SortedIndexStorage extends IndexStorage {
     /** Exclude lower bound. */
     int GREATER = 0;
 
@@ -47,18 +47,6 @@ public interface SortedIndexStorage {
      */
     SortedIndexDescriptor indexDescriptor();
 
-    /**
-     * Adds the given index row to the index.
-     */
-    void put(IndexRow row);
-
-    /**
-     * Removes the given row from the index.
-     *
-     * <p>Removing a non-existent row is a no-op.
-     */
-    void remove(IndexRow row);
-
     /**
      * Returns a range of index values between the lower bound and the upper 
bound.
      *
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index f5b6896c50..32a0879bce 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -49,6 +50,7 @@ import 
org.apache.ignite.internal.schema.testutils.definition.index.IndexDefinit
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.junit.jupiter.api.Test;
 
@@ -128,6 +130,23 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThat(toList(partitionStorage1.scan(row -> true, txId)), 
contains(unwrap(testData1)));
     }
 
+    /**
+     * Tests the {@link MvTableStorage#getOrCreateIndex} method.
+     */
+    @Test
+    public void testCreateIndex() {
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx.id()));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx.id()));
+
+        // Index should only be available after the associated partition has 
been created.
+        tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+        assertThat(tableStorage.getOrCreateIndex(PARTITION_ID, 
sortedIdx.id()), is(instanceOf(SortedIndexStorage.class)));
+        assertThat(tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx.id()), 
is(instanceOf(HashIndexStorage.class)));
+
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateIndex(PARTITION_ID, UUID.randomUUID()));
+    }
+
     /**
      * Test creating a Sorted Index.
      */
@@ -216,8 +235,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         assertThat(getAll(storage1.get(tuple)), contains(rowId1));
         assertThat(getAll(storage2.get(tuple)), contains(rowId2));
-
-        assertThat(tableStorage.destroyIndex(sortedIdx.id()), 
willCompleteSuccessfully());
     }
 
     /**
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
index 1ff44ebf0a..f835947c86 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
@@ -162,6 +162,11 @@ public class TestConcurrentHashMapMvTableStorage 
implements MvTableStorage {
         return tableCfg;
     }
 
+    @Override
+    public TablesConfiguration tablesConfiguration() {
+        return tablesCfg;
+    }
+
     @Override
     public void start() throws StorageException {
     }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index dce0ebcc01..c3b28735ce 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -35,6 +35,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 
@@ -54,6 +55,7 @@ import 
org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
@@ -225,12 +227,46 @@ public abstract class AbstractSortedIndexStorageTest {
     }
 
     @Test
-    public void testEmpty() throws Exception {
+    void testEmpty() throws Exception {
         SortedIndexStorage index = 
createIndexStorage(shuffledRandomDefinitions());
 
         assertThat(scan(index, null, null, 0), is(empty()));
     }
 
+    @Test
+    void testGet() throws Exception {
+        List<ColumnDefinition> columns = List.of(
+                column(ColumnType.string().typeSpec().name(), 
ColumnType.string()).asNullable(false).build(),
+                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.INT32).asNullable(false).build()
+        );
+
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex(randomString(random, 10))
+                .addIndexColumn(columns.get(0).name()).asc().done()
+                .addIndexColumn(columns.get(1).name()).asc().done()
+                .build();
+
+        SortedIndexStorage index = createIndexStorage(indexDefinition);
+
+        var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
+
+        IndexRow val1090 = serializer.serializeRow(new Object[]{ "10", 90 }, 
new RowId(0));
+        IndexRow otherVal1090 = serializer.serializeRow(new Object[]{ "10", 90 
}, new RowId(0));
+        IndexRow val1080 = serializer.serializeRow(new Object[]{ "10", 80 }, 
new RowId(0));
+        IndexRow val2090 = serializer.serializeRow(new Object[]{ "20", 90 }, 
new RowId(0));
+
+        put(index, val1090);
+        put(index, otherVal1090);
+        put(index, val1080);
+
+        assertThat(get(index, val1090.indexColumns()), 
containsInAnyOrder(val1090.rowId(), otherVal1090.rowId()));
+
+        assertThat(get(index, otherVal1090.indexColumns()), 
containsInAnyOrder(val1090.rowId(), otherVal1090.rowId()));
+
+        assertThat(get(index, val1080.indexColumns()), 
containsInAnyOrder(val1080.rowId()));
+
+        assertThat(get(index, val2090.indexColumns()), is(empty()));
+    }
+
     /**
      * Tests the Put-Get-Remove case when an index is created using a single 
column.
      */
@@ -247,7 +283,7 @@ public abstract class AbstractSortedIndexStorageTest {
     void testPutIdempotence() throws Exception {
         List<ColumnDefinition> columns = List.of(
                 column(ColumnType.string().typeSpec().name(), 
ColumnType.string()).asNullable(false).build(),
-                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.string()).asNullable(false).build()
+                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.INT32).asNullable(false).build()
         );
 
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -267,7 +303,7 @@ public abstract class AbstractSortedIndexStorageTest {
         put(index, row);
         put(index, row);
 
-        IndexRow actualRow = getSingle(index, 
serializer.serializeRowPrefix(columnValues));
+        IndexRow actualRow = getSingle(index, row.indexColumns());
 
         assertThat(actualRow.rowId(), is(equalTo(row.rowId())));
     }
@@ -279,7 +315,7 @@ public abstract class AbstractSortedIndexStorageTest {
     void testMultiplePuts() throws Exception {
         List<ColumnDefinition> columns = List.of(
                 column(ColumnType.string().typeSpec().name(), 
ColumnType.string()).asNullable(false).build(),
-                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.string()).asNullable(false).build()
+                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.INT32).asNullable(false).build()
         );
 
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -317,7 +353,7 @@ public abstract class AbstractSortedIndexStorageTest {
     void testRemove() throws Exception {
         List<ColumnDefinition> columns = List.of(
                 column(ColumnType.string().typeSpec().name(), 
ColumnType.string()).asNullable(false).build(),
-                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.string()).asNullable(false).build()
+                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.INT32).asNullable(false).build()
         );
 
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -421,7 +457,7 @@ public abstract class AbstractSortedIndexStorageTest {
     public void testBoundsAndOrder() throws Exception {
         List<ColumnDefinition> columns = List.of(
                 column(ColumnType.string().typeSpec().name(), 
ColumnType.string()).asNullable(false).build(),
-                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.string()).asNullable(false).build()
+                column(ColumnType.INT32.typeSpec().name(), 
ColumnType.INT32).asNullable(false).build()
         );
 
         SortedIndexDefinition index1Definition = 
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -606,38 +642,36 @@ public abstract class AbstractSortedIndexStorageTest {
         // using a cycle here to protect against equal keys being generated
         do {
             entry2 = TestIndexRow.randomRow(indexStorage);
-        } while (entry1.equals(entry2));
+        } while 
(entry1.indexColumns().byteBuffer().equals(entry2.indexColumns().byteBuffer()));
 
         put(indexStorage, entry1);
         put(indexStorage, entry2);
 
         assertThat(
-                getSingle(indexStorage, 
entry1.prefix(indexSchema.size())).rowId(),
+                getSingle(indexStorage, entry1.indexColumns()).rowId(),
                 is(equalTo(entry1.rowId()))
         );
 
         assertThat(
-                getSingle(indexStorage, 
entry2.prefix(indexSchema.size())).rowId(),
+                getSingle(indexStorage, entry2.indexColumns()).rowId(),
                 is(equalTo(entry2.rowId()))
         );
 
         remove(indexStorage, entry1);
 
-        assertThat(getSingle(indexStorage, entry1.prefix(indexSchema.size())), 
is(nullValue()));
+        assertThat(getSingle(indexStorage, entry1.indexColumns()), 
is(nullValue()));
     }
 
     /**
      * Extracts a single value by a given key or {@code null} if it does not 
exist.
      */
     @Nullable
-    private static IndexRow getSingle(SortedIndexStorage indexStorage, 
BinaryTuplePrefix fullPrefix) throws Exception {
-        try (Cursor<IndexRow> cursor = indexStorage.scan(fullPrefix, 
fullPrefix, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
-            List<IndexRow> values = cursor.stream().collect(toList());
+    private static IndexRow getSingle(SortedIndexStorage indexStorage, 
BinaryTuple fullPrefix) throws Exception {
+        List<RowId> rowIds = get(indexStorage, fullPrefix);
 
-            assertThat(values, anyOf(empty(), hasSize(1)));
+        assertThat(rowIds, anyOf(empty(), hasSize(1)));
 
-            return values.isEmpty() ? null : values.get(0);
-        }
+        return rowIds.isEmpty() ? null : new IndexRowImpl(fullPrefix, 
rowIds.get(0));
     }
 
     private static BinaryTuplePrefix prefix(SortedIndexStorage index, 
Object... vals) {
@@ -661,6 +695,12 @@ public abstract class AbstractSortedIndexStorageTest {
         }
     }
 
+    private static List<RowId> get(SortedIndexStorage index, BinaryTuple key) 
throws Exception {
+        try (Cursor<RowId> cursor = index.get(key)) {
+            return cursor.stream().collect(toUnmodifiableList());
+        }
+    }
+
     private void put(SortedIndexStorage indexStorage, IndexRow row) {
         partitionStorage.runConsistently(() -> {
             indexStorage.put(row);
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
index 6c5951e6b2..b2f072d009 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
@@ -24,7 +24,6 @@ import java.lang.reflect.Array;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Comparator;
-import java.util.Objects;
 import java.util.Random;
 import java.util.function.Function;
 import java.util.stream.IntStream;
@@ -166,23 +165,4 @@ public class TestIndexRow implements IndexRow, 
Comparable<TestIndexRow> {
                 .map(Comparable.class::cast)
                 .toArray(Comparable[]::new);
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TestIndexRow that = (TestIndexRow) o;
-        return Arrays.equals(columns, that.columns) && 
row.rowId().equals(that.row.rowId());
-    }
-
-    @Override
-    public int hashCode() {
-        int result = Objects.hash(row.rowId());
-        result = 31 * result + Arrays.hashCode(columns);
-        return result;
-    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index c856b937e6..1dce63c58c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -32,6 +32,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -64,6 +65,13 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
         return descriptor;
     }
 
+    @Override
+    public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+        Set<RowId> rowIds = index.getOrDefault(key.byteBuffer(), Set.of());
+
+        return Cursor.fromIterator(rowIds.iterator());
+    }
+
     @Override
     public void put(IndexRow row) {
         index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 4a97e6149f..e734f74565 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -47,7 +47,7 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
 
     protected volatile boolean started;
 
-    protected volatile 
AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
+    private volatile 
AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
 
     /**
      * Constructor.
@@ -64,6 +64,11 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
         return tableCfg;
     }
 
+    @Override
+    public TablesConfiguration tablesConfiguration() {
+        return tablesConfiguration;
+    }
+
     /**
      * Returns a data region instance for the table.
      */
@@ -143,12 +148,24 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
 
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID 
indexId) {
-        return 
getOrCreateMvPartition(partitionId).getOrCreateSortedIndex(indexId);
+        AbstractPageMemoryMvPartitionStorage partitionStorage = 
getMvPartition(partitionId);
+
+        if (partitionStorage == null) {
+            throw new StorageException(String.format("Partition ID %d does not 
exist", partitionId));
+        }
+
+        return partitionStorage.getOrCreateSortedIndex(indexId);
     }
 
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID 
indexId) {
-        return 
getOrCreateMvPartition(partitionId).getOrCreateHashIndex(indexId);
+        AbstractPageMemoryMvPartitionStorage partitionStorage = 
getMvPartition(partitionId);
+
+        if (partitionStorage == null) {
+            throw new StorageException(String.format("Partition ID %d does not 
exist", partitionId));
+        }
+
+        return partitionStorage.getOrCreateHashIndex(indexId);
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 3294718614..dc17e525be 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -86,12 +86,12 @@ public class PageMemoryHashIndexStorage implements 
HashIndexStorage {
         IgniteCursor<HashIndexRow> cursor;
 
         try {
-            cursor = hashIndexTree.find(lowerBound, upperBound, null);
+            cursor = hashIndexTree.find(lowerBound, upperBound);
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException("Failed to create scan cursor", e);
         }
 
-        return Cursor.fromIterator(new TreeCursorAdapter<>(cursor, 
HashIndexRow::rowId));
+        return new TreeCursorAdapter<>(cursor, HashIndexRow::rowId);
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index a202b199ab..7946c77853 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.storage.pagememory.index.sorted;
 
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -68,6 +69,23 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
         return descriptor;
     }
 
+    @Override
+    public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+        BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
+
+        SortedIndexRowKey prefixKey = toSortedIndexRowKey(prefix);
+
+        IgniteCursor<SortedIndexRow> cursor;
+
+        try {
+            cursor = sortedIndexTree.find(prefixKey, prefixKey);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Failed to create scan cursor", e);
+        }
+
+        return new TreeCursorAdapter<>(cursor, SortedIndexRow::rowId);
+    }
+
     @Override
     public void put(IndexRow row) {
         IndexColumns indexColumns = new IndexColumns(partitionId, 
row.indexColumns().byteBuffer());
@@ -118,7 +136,7 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
             throw new StorageException("Failed to create scan cursor", e);
         }
 
-        return Cursor.fromIterator(new TreeCursorAdapter<>(cursor, 
this::toIndexRowImpl));
+        return new TreeCursorAdapter<>(cursor, this::toIndexRowImpl);
     }
 
     private @Nullable SortedIndexRowKey toSortedIndexRowKey(@Nullable 
BinaryTuplePrefix binaryTuple) {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
index a2eaf2c7ab..fc67539f90 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.function.Function;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteCursor;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
@@ -30,7 +31,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
  * @param <TREE_ROWT> Type of elements in a tree cursor.
  * @param <CURSOR_ROWT> Type of elements in a resulting iterator.
  */
-public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT> implements 
Iterator<CURSOR_ROWT> {
+public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT> implements 
Cursor<CURSOR_ROWT> {
     /** Cursor instance from the tree. */
     private final IgniteCursor<TREE_ROWT> cursor;
 
@@ -74,4 +75,9 @@ public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT> 
implements Iterator<CURSO
             throw new StorageException("Failed to read next element from the 
tree", e);
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        // no-op
+    }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
new file mode 100644
index 0000000000..2cc4634e4b
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import 
org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageConfigurationSchema;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link PersistentPageMemoryTableStorage} class.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStorageTest {
+    private PersistentPageMemoryStorageEngine engine;
+
+    private MvTableStorage tableStorage;
+
+    @BeforeEach
+    void setUp(
+            @WorkDirectory
+            Path workDir,
+            @InjectConfiguration(polymorphicExtensions = 
UnsafeMemoryAllocatorConfigurationSchema.class)
+            PersistentPageMemoryStorageEngineConfiguration engineConfig,
+            @InjectConfiguration(
+                    polymorphicExtensions = {
+                            
PersistentPageMemoryDataStorageConfigurationSchema.class,
+                            UnknownDataStorageConfigurationSchema.class,
+                            HashIndexConfigurationSchema.class,
+                            SortedIndexConfigurationSchema.class,
+                            NullValueDefaultConfigurationSchema.class,
+                            UnlimitedBudgetConfigurationSchema.class
+                    },
+                    value = "mock.tables.foo{ partitions = 512, 
dataStorage.name = " + PersistentPageMemoryStorageEngine.ENGINE_NAME + "}"
+            )
+            TablesConfiguration tablesConfig
+    ) {
+        var ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        engine = new PersistentPageMemoryStorageEngine("test", engineConfig, 
ioRegistry, workDir, null);
+
+        engine.start();
+
+        tableStorage = engine.createMvTable(tablesConfig.tables().get("foo"), 
tablesConfig);
+
+        tableStorage.start();
+
+        initialize(tableStorage, tablesConfig);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                tableStorage == null ? null : tableStorage::stop,
+                engine == null ? null : engine::stop
+        );
+    }
+
+    // TODO: Enable this test after index destruction is implemented.
+    @Disabled
+    @Override
+    public void testDestroyIndex() {
+        super.testDestroyIndex();
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
new file mode 100644
index 0000000000..be25cc7fd4
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import 
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import 
org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageConfigurationSchema;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link VolatilePageMemoryTableStorage}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class VolatilePageMemoryMvTableStorageTest extends 
AbstractMvTableStorageTest {
+    private VolatilePageMemoryStorageEngine engine;
+
+    private MvTableStorage tableStorage;
+
+    @BeforeEach
+    void setUp(
+            @InjectConfiguration(polymorphicExtensions = 
UnsafeMemoryAllocatorConfigurationSchema.class)
+            VolatilePageMemoryStorageEngineConfiguration engineConfig,
+            @InjectConfiguration(
+                    polymorphicExtensions = {
+                            
VolatilePageMemoryDataStorageConfigurationSchema.class,
+                            UnknownDataStorageConfigurationSchema.class,
+                            HashIndexConfigurationSchema.class,
+                            SortedIndexConfigurationSchema.class,
+                            NullValueDefaultConfigurationSchema.class,
+                            UnlimitedBudgetConfigurationSchema.class
+                    },
+                    value = "mock.tables.foo{ partitions = 512, 
dataStorage.name = " + VolatilePageMemoryStorageEngine.ENGINE_NAME + "}"
+            )
+            TablesConfiguration tablesConfig
+    ) {
+        var ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+
+        engine.start();
+
+        tableStorage = engine.createMvTable(tablesConfig.tables().get("foo"), 
tablesConfig);
+
+        tableStorage.start();
+
+        initialize(tableStorage, tablesConfig);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                tableStorage == null ? null : tableStorage::stop,
+                engine == null ? null : engine::stop
+        );
+    }
+
+    // TODO: Enable this test after index destruction is implemented.
+    @Disabled
+    @Override
+    public void testDestroyIndex() {
+        super.testDestroyIndex();
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2668d81708..2fcfa0071c 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -179,6 +179,11 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return tableCfg;
     }
 
+    @Override
+    public TablesConfiguration tablesConfiguration() {
+        return tablesCfg;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() throws StorageException {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 927be72c10..38dfe81389 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
@@ -61,6 +60,10 @@ import org.rocksdb.WriteBatchWithIndex;
  * <p>We use an empty array as values, because all required information can be 
extracted from the key.
  */
 public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+    private static final int ROW_ID_SIZE = Long.BYTES * 2;
+
+    private static final ByteOrder ORDER = ByteOrder.BIG_ENDIAN;
+
     private final SortedIndexDescriptor descriptor;
 
     private final ColumnFamily indexCf;
@@ -89,6 +92,13 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
         return descriptor;
     }
 
+    @Override
+    public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+        BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
+
+        return map(scan(keyPrefix, keyPrefix, true, true), this::decodeRowId);
+    }
+
     @Override
     public void put(IndexRow row) {
         WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
@@ -116,17 +126,26 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
         boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
         boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
 
-        return scan(lowerBound, upperBound, includeLower, includeUpper);
+        return map(scan(lowerBound, upperBound, includeLower, includeUpper), 
this::decodeRow);
     }
 
-    private Cursor<IndexRow> scan(
+    private Cursor<ByteBuffer> scan(
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             boolean includeLower,
             boolean includeUpper
     ) {
         byte[] lowerBoundBytes = lowerBound == null ? null : 
rocksPrefix(lowerBound);
-        byte[] upperBoundBytes = upperBound == null ? null : 
rocksPrefix(upperBound);
+
+        byte[] upperBoundBytes;
+
+        if (upperBound == null) {
+            upperBoundBytes = null;
+        } else if (lowerBound == upperBound) {
+            upperBoundBytes = lowerBoundBytes;
+        } else {
+            upperBoundBytes = rocksPrefix(upperBound);
+        }
 
         Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes, 
upperBoundBytes);
 
@@ -137,12 +156,15 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
 
         // Include the upper bound, if needed (RocksDB excludes the upper 
bound by default).
         if (includeUpper && upperBound != null) {
-            Cursor<ByteBuffer> upperBoundCursor = 
takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+            Cursor<ByteBuffer> upperBoundCursor = takeWhile(
+                    createScanCursor(upperBoundBytes, null),
+                    startsWith(upperBound)
+            );
 
             cursor = concat(cursor, upperBoundCursor);
         }
 
-        return map(cursor, this::decodeRow);
+        return cursor;
     }
 
     private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound, 
byte @Nullable [] upperBound) {
@@ -161,7 +183,7 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
         return new RocksIteratorAdapter<>(it) {
             @Override
             protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
-                return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+                return ByteBuffer.wrap(key).order(ORDER);
             }
 
             @Override
@@ -178,35 +200,36 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
 
         var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), 
binaryTupleSlice(bytes));
 
+        return new IndexRowImpl(tuple, decodeRowId(bytes));
+    }
+
+    private RowId decodeRowId(ByteBuffer bytes) {
         // RowId UUID is located at the last 16 bytes of the key
         long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES * 
2);
         long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
 
-        var rowId = new RowId(partitionStorage.partitionId(), 
mostSignificantBits, leastSignificantBits);
-
-        return new IndexRowImpl(tuple, rowId);
+        return new RowId(partitionStorage.partitionId(), mostSignificantBits, 
leastSignificantBits);
     }
 
     private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
-        return rocksPrefix(prefix, 0).array();
-    }
-
-    private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
-        ByteBuffer keyBytes = prefix.byteBuffer();
+        ByteBuffer bytes = prefix.byteBuffer();
 
-        return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() + 
extraLength)
-                .order(ByteOrder.BIG_ENDIAN)
+        return ByteBuffer.allocate(Short.BYTES + bytes.remaining())
+                .order(ORDER)
                 .putShort((short) partitionStorage.partitionId())
-                .put(keyBytes);
+                .put(bytes)
+                .array();
     }
 
     private byte[] rocksKey(IndexRow row) {
-        RowId rowId = row.rowId();
+        ByteBuffer bytes = row.indexColumns().byteBuffer();
 
-        // We don't store the Partition ID as it is already a part of the key.
-        return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
-                .putLong(rowId.mostSignificantBits())
-                .putLong(rowId.leastSignificantBits())
+        return ByteBuffer.allocate(Short.BYTES + bytes.remaining() + 
ROW_ID_SIZE)
+                .order(ORDER)
+                .putShort((short) partitionStorage.partitionId())
+                .put(bytes)
+                .putLong(row.rowId().mostSignificantBits())
+                .putLong(row.rowId().leastSignificantBits())
                 .array();
     }
 
@@ -233,7 +256,7 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
                 // Discard partition ID.
                 .position(Short.BYTES)
                 // Discard row ID.
-                .limit(key.limit() - Long.BYTES * 2)
+                .limit(key.limit() - ROW_ID_SIZE)
                 .slice()
                 .order(ByteOrder.LITTLE_ENDIAN);
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 16c3e2773e..dc2c11989a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
 import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
@@ -379,7 +378,7 @@ public class TxStateRocksDbStorage implements 
TxStateStorage {
                 throw e;
             }
 
-            RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter 
= new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+            return new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
                 @Override protected IgniteBiTuple<UUID, TxMeta> 
decodeEntry(byte[] keyBytes, byte[] valueBytes) {
                     UUID key = bytesToUuid(keyBytes, 0);
                     TxMeta txMeta = fromBytes(valueBytes);
@@ -399,8 +398,6 @@ public class TxStateRocksDbStorage implements 
TxStateStorage {
                     super.close();
                 }
             };
-
-            return Cursor.fromIterator(iteratorAdapter);
         } finally {
             busyLock.leaveBusy();
         }

Reply via email to