This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cd32ecc99f IGNITE-17755 Add a common interface for all index storages
(#1127)
cd32ecc99f is described below
commit cd32ecc99fa038176ba4b7b64f0d9d2c50b9a491
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Sep 29 13:36:19 2022 +0300
IGNITE-17755 Add a common interface for all index storages (#1127)
---
.../internal/binarytuple/BinaryTupleParser.java | 6 +-
.../apache/ignite/internal/schema/BinaryTuple.java | 9 ++
.../ignite/internal/schema/BinaryTuplePrefix.java | 24 +++++
.../internal/storage/engine/MvTableStorage.java | 51 +++++++++--
.../internal/storage/index/HashIndexStorage.java | 33 +------
.../{HashIndexStorage.java => IndexStorage.java} | 31 ++-----
.../internal/storage/index/SortedIndexStorage.java | 14 +--
.../storage/AbstractMvTableStorageTest.java | 21 ++++-
.../chm/TestConcurrentHashMapMvTableStorage.java | 5 +
.../index/AbstractSortedIndexStorageTest.java | 72 +++++++++++----
.../internal/storage/index/impl/TestIndexRow.java | 20 ----
.../storage/index/impl/TestSortedIndexStorage.java | 8 ++
.../pagememory/AbstractPageMemoryTableStorage.java | 23 ++++-
.../index/hash/PageMemoryHashIndexStorage.java | 4 +-
.../index/sorted/PageMemorySortedIndexStorage.java | 20 +++-
.../storage/pagememory/util/TreeCursorAdapter.java | 8 +-
.../PersistentPageMemoryMvTableStorageTest.java | 101 +++++++++++++++++++++
.../VolatilePageMemoryMvTableStorageTest.java | 95 +++++++++++++++++++
.../storage/rocksdb/RocksDbTableStorage.java | 5 +
.../rocksdb/index/RocksDbSortedIndexStorage.java | 71 ++++++++++-----
.../state/rocksdb/TxStateRocksDbStorage.java | 5 +-
21 files changed, 473 insertions(+), 153 deletions(-)
diff --git
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
index 7665977c59..b9105b0204 100644
---
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
+++
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
@@ -46,12 +46,12 @@ public class BinaryTupleParser {
void nextElement(int index, int begin, int end);
}
+ /** Byte order of ByteBuffers that contain the tuple. */
+ public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
/** UUID size in bytes. */
private static final int UUID_SIZE = 16;
- /** Byte order of ByteBuffers that contain the tuple. */
- private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
-
/** Number of elements in the tuple. */
private final int numElements;
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
index d5b4f7d743..1e0898a0d6 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuple.java
@@ -73,4 +73,13 @@ public class BinaryTuple extends BinaryTupleReader
implements InternalTuple {
public Object value(int index) {
return schema.element(index).typeSpec.objectValue(this, index);
}
+
+ /**
+ * Returns the schema of this tuple.
+ *
+ * @return This tuple's schema.
+ */
+ public BinaryTupleSchema schema() {
+ return schema;
+ }
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
index d7ef4817f5..a161115dec 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.schema;
+import static
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.PREFIX_FLAG;
+
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
@@ -54,6 +56,28 @@ public class BinaryTuplePrefix extends BinaryTupleReader
implements InternalTupl
this.schema = schema;
}
+ /**
+ * Creates a prefix that contains all columns from the provided {@link
BinaryTuple}.
+ *
+ * @param tuple Tuple to create a prefix from.
+ * @return Prefix, equivalent to the tuple.
+ */
+ public static BinaryTuplePrefix fromBinaryTuple(BinaryTuple tuple) {
+ ByteBuffer tupleBuffer = tuple.byteBuffer();
+
+ ByteBuffer prefixBuffer = ByteBuffer.allocate(tupleBuffer.remaining()
+ Integer.BYTES)
+ .order(ORDER)
+ .put(tupleBuffer)
+ .putInt(tuple.count())
+ .flip();
+
+ byte flags = prefixBuffer.get(0);
+
+ prefixBuffer.put(0, (byte) (flags | PREFIX_FLAG));
+
+ return new BinaryTuplePrefix(tuple.schema(), prefixBuffer);
+ }
+
@Override
public int count() {
return elementCount();
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index cbd83726ee..701a321948 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -17,12 +17,19 @@
package org.apache.ignite.internal.storage.engine;
+import static
org.apache.ignite.configuration.schemas.table.TableIndexConfigurationSchema.HASH_INDEX_TYPE;
+import static
org.apache.ignite.configuration.schemas.table.TableIndexConfigurationSchema.SORTED_INDEX_TYPE;
+
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.jetbrains.annotations.Nullable;
@@ -31,11 +38,11 @@ import org.jetbrains.annotations.Nullable;
*/
public interface MvTableStorage {
/**
- * Retrieves or creates a partition for the current table. Not expected to
be called concurrently with the same partition id.
+ * Retrieves or creates a partition for the current table. Not expected to
be called concurrently with the same Partition ID.
*
- * @param partitionId Partition id.
+ * @param partitionId Partition ID.
* @return Partition storage.
- * @throws IllegalArgumentException If partition id is out of configured
bounds.
+ * @throws IllegalArgumentException If Partition ID is out of configured
bounds.
* @throws StorageException If an error has occurred during the partition
creation.
*/
MvPartitionStorage getOrCreateMvPartition(int partitionId) throws
StorageException;
@@ -43,21 +50,46 @@ public interface MvTableStorage {
/**
* Returns the partition storage or {@code null} if the requested storage
doesn't exist.
*
- * @param partitionId Partition id.
+ * @param partitionId Partition ID.
* @return Partition storage or {@code null} if it does not exist.
- * @throws IllegalArgumentException If partition id is out of configured
bounds.
+ * @throws IllegalArgumentException If Partition ID is out of configured
bounds.
*/
@Nullable MvPartitionStorage getMvPartition(int partitionId);
/**
* Destroys a partition and all associated indices.
*
- * @param partitionId Partition id.
- * @throws IllegalArgumentException If partition id is out of bounds.
+ * @param partitionId Partition ID.
+ * @throws IllegalArgumentException If Partition ID is out of bounds.
* @throws StorageException If an error has occurred during the partition
destruction.
*/
CompletableFuture<Void> destroyPartition(int partitionId) throws
StorageException;
+ /**
+ * Returns an already created Index (either Sorted or Hash) with the given
name or creates a new one if it does not exist.
+ *
+ * @param partitionId Partition ID.
+ * @param indexId Index ID.
+ * @return Index Storage.
+ * @throws StorageException If the given partition does not exist, or if
the given index does not exist.
+ */
+ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
+ TableIndexConfiguration indexConfig =
ConfigurationUtil.getByInternalId(tablesConfiguration().indexes(), indexId);
+
+ if (indexConfig == null) {
+ throw new StorageException(String.format("Index configuration for
\"%s\" could not be found", indexId));
+ }
+
+ switch (indexConfig.type().value()) {
+ case HASH_INDEX_TYPE:
+ return getOrCreateHashIndex(partitionId, indexId);
+ case SORTED_INDEX_TYPE:
+ return getOrCreateSortedIndex(partitionId, indexId);
+ default:
+ throw new StorageException("Unknown index type: " +
indexConfig.type().value());
+ }
+ }
+
/**
* Returns an already created Sorted Index with the given name or creates
a new one if it does not exist.
*
@@ -105,6 +137,11 @@ public interface MvTableStorage {
*/
TableConfiguration configuration();
+ /**
+ * Returns configuration for all tables and indices.
+ */
+ TablesConfiguration tablesConfiguration();
+
/**
* Starts the storage.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
index e6f531fc98..60ed7f9e05 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
@@ -18,11 +18,9 @@
package org.apache.ignite.internal.storage.index;
import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.util.Cursor;
/**
* Storage for a Hash Index.
@@ -30,41 +28,12 @@ import org.apache.ignite.internal.util.Cursor;
* <p>This storage serves as an unordered mapping from a subset of a table's
columns (a.k.a. index columns) to a set of {@link RowId}s
* from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage}
from the same table.
*/
-public interface HashIndexStorage {
+public interface HashIndexStorage extends IndexStorage {
/**
* Returns the Index Descriptor of this storage.
*/
HashIndexDescriptor indexDescriptor();
- /**
- * Returns a cursor over {@code RowId}s associated with the given index
key.
- *
- * @throws StorageException If failed to read data.
- */
- Cursor<RowId> get(BinaryTuple key) throws StorageException;
-
- /**
- * Adds the given index row to the index.
- *
- * <p>Usage note: this method <b>must</b> always be called inside the
corresponding partition's
- * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
- *
- * @throws StorageException If failed to put data.
- */
- void put(IndexRow row) throws StorageException;
-
- /**
- * Removes the given row from the index.
- *
- * <p>Removing a non-existent row is a no-op.
- *
- * <p>Usage note: this method <b>must</b> always be called inside the
corresponding partition's
- * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
- *
- * @throws StorageException If failed to remove data.
- */
- void remove(IndexRow row) throws StorageException;
-
/**
* Removes all data from this index.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
similarity index 58%
copy from
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
copy to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
index e6f531fc98..0addff3490 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
@@ -17,25 +17,15 @@
package org.apache.ignite.internal.storage.index;
-import java.util.UUID;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.util.Cursor;
/**
- * Storage for a Hash Index.
- *
- * <p>This storage serves as an unordered mapping from a subset of a table's
columns (a.k.a. index columns) to a set of {@link RowId}s
- * from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage}
from the same table.
+ * Common interface for all Index Storage implementations.
*/
-public interface HashIndexStorage {
- /**
- * Returns the Index Descriptor of this storage.
- */
- HashIndexDescriptor indexDescriptor();
-
+public interface IndexStorage {
/**
* Returns a cursor over {@code RowId}s associated with the given index
key.
*
@@ -46,8 +36,8 @@ public interface HashIndexStorage {
/**
* Adds the given index row to the index.
*
- * <p>Usage note: this method <b>must</b> always be called inside the
corresponding partition's
- * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
+ * @apiNote This method <b>must</b> always be called inside the
corresponding partition's
+ * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
*
* @throws StorageException If failed to put data.
*/
@@ -58,19 +48,10 @@ public interface HashIndexStorage {
*
* <p>Removing a non-existent row is a no-op.
*
- * <p>Usage note: this method <b>must</b> always be called inside the
corresponding partition's
- * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
+ * @apiNote This method <b>must</b> always be called inside the
corresponding partition's
+ * {@link
org.apache.ignite.internal.storage.MvPartitionStorage#runConsistently} closure.
*
* @throws StorageException If failed to remove data.
*/
void remove(IndexRow row) throws StorageException;
-
- /**
- * Removes all data from this index.
- *
- * @throws StorageException If failed to destory index.
- * @deprecated IGNITE-17626 Synchronous API should be removed. {@link
MvTableStorage#destroyIndex(UUID)} must be the only public option.
- */
- @Deprecated
- void destroy() throws StorageException;
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 79ab7a4d89..2aa1542537 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
* <p>This storage serves as a sorted mapping from a subset of a table's
columns (a.k.a. index columns) to a set of {@link RowId}s
* from a single {@link org.apache.ignite.internal.storage.MvPartitionStorage}
from the same table.
*/
-public interface SortedIndexStorage {
+public interface SortedIndexStorage extends IndexStorage {
/** Exclude lower bound. */
int GREATER = 0;
@@ -47,18 +47,6 @@ public interface SortedIndexStorage {
*/
SortedIndexDescriptor indexDescriptor();
- /**
- * Adds the given index row to the index.
- */
- void put(IndexRow row);
-
- /**
- * Removes the given row from the index.
- *
- * <p>Removing a non-existent row is a no-op.
- */
- void remove(IndexRow row);
-
/**
* Returns a range of index values between the lower bound and the upper
bound.
*
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index f5b6896c50..32a0879bce 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@@ -49,6 +50,7 @@ import
org.apache.ignite.internal.schema.testutils.definition.index.IndexDefinit
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.Test;
@@ -128,6 +130,23 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThat(toList(partitionStorage1.scan(row -> true, txId)),
contains(unwrap(testData1)));
}
+ /**
+ * Tests the {@link MvTableStorage#getOrCreateIndex} method.
+ */
+ @Test
+ public void testCreateIndex() {
+ assertThrows(StorageException.class, () ->
tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx.id()));
+ assertThrows(StorageException.class, () ->
tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx.id()));
+
+ // Index should only be available after the associated partition has
been created.
+ tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ assertThat(tableStorage.getOrCreateIndex(PARTITION_ID,
sortedIdx.id()), is(instanceOf(SortedIndexStorage.class)));
+ assertThat(tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx.id()),
is(instanceOf(HashIndexStorage.class)));
+
+ assertThrows(StorageException.class, () ->
tableStorage.getOrCreateIndex(PARTITION_ID, UUID.randomUUID()));
+ }
+
/**
* Test creating a Sorted Index.
*/
@@ -216,8 +235,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThat(getAll(storage1.get(tuple)), contains(rowId1));
assertThat(getAll(storage2.get(tuple)), contains(rowId2));
-
- assertThat(tableStorage.destroyIndex(sortedIdx.id()),
willCompleteSuccessfully());
}
/**
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
index 1ff44ebf0a..f835947c86 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
@@ -162,6 +162,11 @@ public class TestConcurrentHashMapMvTableStorage
implements MvTableStorage {
return tableCfg;
}
+ @Override
+ public TablesConfiguration tablesConfiguration() {
+ return tablesCfg;
+ }
+
@Override
public void start() throws StorageException {
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index dce0ebcc01..c3b28735ce 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -35,6 +35,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
@@ -54,6 +55,7 @@ import
org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
@@ -225,12 +227,46 @@ public abstract class AbstractSortedIndexStorageTest {
}
@Test
- public void testEmpty() throws Exception {
+ void testEmpty() throws Exception {
SortedIndexStorage index =
createIndexStorage(shuffledRandomDefinitions());
assertThat(scan(index, null, null, 0), is(empty()));
}
+ @Test
+ void testGet() throws Exception {
+ List<ColumnDefinition> columns = List.of(
+ column(ColumnType.string().typeSpec().name(),
ColumnType.string()).asNullable(false).build(),
+ column(ColumnType.INT32.typeSpec().name(),
ColumnType.INT32).asNullable(false).build()
+ );
+
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex(randomString(random, 10))
+ .addIndexColumn(columns.get(0).name()).asc().done()
+ .addIndexColumn(columns.get(1).name()).asc().done()
+ .build();
+
+ SortedIndexStorage index = createIndexStorage(indexDefinition);
+
+ var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
+
+ IndexRow val1090 = serializer.serializeRow(new Object[]{ "10", 90 },
new RowId(0));
+ IndexRow otherVal1090 = serializer.serializeRow(new Object[]{ "10", 90
}, new RowId(0));
+ IndexRow val1080 = serializer.serializeRow(new Object[]{ "10", 80 },
new RowId(0));
+ IndexRow val2090 = serializer.serializeRow(new Object[]{ "20", 90 },
new RowId(0));
+
+ put(index, val1090);
+ put(index, otherVal1090);
+ put(index, val1080);
+
+ assertThat(get(index, val1090.indexColumns()),
containsInAnyOrder(val1090.rowId(), otherVal1090.rowId()));
+
+ assertThat(get(index, otherVal1090.indexColumns()),
containsInAnyOrder(val1090.rowId(), otherVal1090.rowId()));
+
+ assertThat(get(index, val1080.indexColumns()),
containsInAnyOrder(val1080.rowId()));
+
+ assertThat(get(index, val2090.indexColumns()), is(empty()));
+ }
+
/**
* Tests the Put-Get-Remove case when an index is created using a single
column.
*/
@@ -247,7 +283,7 @@ public abstract class AbstractSortedIndexStorageTest {
void testPutIdempotence() throws Exception {
List<ColumnDefinition> columns = List.of(
column(ColumnType.string().typeSpec().name(),
ColumnType.string()).asNullable(false).build(),
- column(ColumnType.INT32.typeSpec().name(),
ColumnType.string()).asNullable(false).build()
+ column(ColumnType.INT32.typeSpec().name(),
ColumnType.INT32).asNullable(false).build()
);
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -267,7 +303,7 @@ public abstract class AbstractSortedIndexStorageTest {
put(index, row);
put(index, row);
- IndexRow actualRow = getSingle(index,
serializer.serializeRowPrefix(columnValues));
+ IndexRow actualRow = getSingle(index, row.indexColumns());
assertThat(actualRow.rowId(), is(equalTo(row.rowId())));
}
@@ -279,7 +315,7 @@ public abstract class AbstractSortedIndexStorageTest {
void testMultiplePuts() throws Exception {
List<ColumnDefinition> columns = List.of(
column(ColumnType.string().typeSpec().name(),
ColumnType.string()).asNullable(false).build(),
- column(ColumnType.INT32.typeSpec().name(),
ColumnType.string()).asNullable(false).build()
+ column(ColumnType.INT32.typeSpec().name(),
ColumnType.INT32).asNullable(false).build()
);
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -317,7 +353,7 @@ public abstract class AbstractSortedIndexStorageTest {
void testRemove() throws Exception {
List<ColumnDefinition> columns = List.of(
column(ColumnType.string().typeSpec().name(),
ColumnType.string()).asNullable(false).build(),
- column(ColumnType.INT32.typeSpec().name(),
ColumnType.string()).asNullable(false).build()
+ column(ColumnType.INT32.typeSpec().name(),
ColumnType.INT32).asNullable(false).build()
);
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -421,7 +457,7 @@ public abstract class AbstractSortedIndexStorageTest {
public void testBoundsAndOrder() throws Exception {
List<ColumnDefinition> columns = List.of(
column(ColumnType.string().typeSpec().name(),
ColumnType.string()).asNullable(false).build(),
- column(ColumnType.INT32.typeSpec().name(),
ColumnType.string()).asNullable(false).build()
+ column(ColumnType.INT32.typeSpec().name(),
ColumnType.INT32).asNullable(false).build()
);
SortedIndexDefinition index1Definition =
SchemaBuilders.sortedIndex(randomString(random, 10))
@@ -606,38 +642,36 @@ public abstract class AbstractSortedIndexStorageTest {
// using a cycle here to protect against equal keys being generated
do {
entry2 = TestIndexRow.randomRow(indexStorage);
- } while (entry1.equals(entry2));
+ } while
(entry1.indexColumns().byteBuffer().equals(entry2.indexColumns().byteBuffer()));
put(indexStorage, entry1);
put(indexStorage, entry2);
assertThat(
- getSingle(indexStorage,
entry1.prefix(indexSchema.size())).rowId(),
+ getSingle(indexStorage, entry1.indexColumns()).rowId(),
is(equalTo(entry1.rowId()))
);
assertThat(
- getSingle(indexStorage,
entry2.prefix(indexSchema.size())).rowId(),
+ getSingle(indexStorage, entry2.indexColumns()).rowId(),
is(equalTo(entry2.rowId()))
);
remove(indexStorage, entry1);
- assertThat(getSingle(indexStorage, entry1.prefix(indexSchema.size())),
is(nullValue()));
+ assertThat(getSingle(indexStorage, entry1.indexColumns()),
is(nullValue()));
}
/**
* Extracts a single value by a given key or {@code null} if it does not
exist.
*/
@Nullable
- private static IndexRow getSingle(SortedIndexStorage indexStorage,
BinaryTuplePrefix fullPrefix) throws Exception {
- try (Cursor<IndexRow> cursor = indexStorage.scan(fullPrefix,
fullPrefix, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
- List<IndexRow> values = cursor.stream().collect(toList());
+ private static IndexRow getSingle(SortedIndexStorage indexStorage,
BinaryTuple fullPrefix) throws Exception {
+ List<RowId> rowIds = get(indexStorage, fullPrefix);
- assertThat(values, anyOf(empty(), hasSize(1)));
+ assertThat(rowIds, anyOf(empty(), hasSize(1)));
- return values.isEmpty() ? null : values.get(0);
- }
+ return rowIds.isEmpty() ? null : new IndexRowImpl(fullPrefix,
rowIds.get(0));
}
private static BinaryTuplePrefix prefix(SortedIndexStorage index,
Object... vals) {
@@ -661,6 +695,12 @@ public abstract class AbstractSortedIndexStorageTest {
}
}
+ private static List<RowId> get(SortedIndexStorage index, BinaryTuple key)
throws Exception {
+ try (Cursor<RowId> cursor = index.get(key)) {
+ return cursor.stream().collect(toUnmodifiableList());
+ }
+ }
+
private void put(SortedIndexStorage indexStorage, IndexRow row) {
partitionStorage.runConsistently(() -> {
indexStorage.put(row);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
index 6c5951e6b2..b2f072d009 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
@@ -24,7 +24,6 @@ import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Comparator;
-import java.util.Objects;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -166,23 +165,4 @@ public class TestIndexRow implements IndexRow,
Comparable<TestIndexRow> {
.map(Comparable.class::cast)
.toArray(Comparable[]::new);
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TestIndexRow that = (TestIndexRow) o;
- return Arrays.equals(columns, that.columns) &&
row.rowId().equals(that.row.rowId());
- }
-
- @Override
- public int hashCode() {
- int result = Objects.hash(row.rowId());
- result = 31 * result + Arrays.hashCode(columns);
- return result;
- }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index c856b937e6..1dce63c58c 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -32,6 +32,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -64,6 +65,13 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
return descriptor;
}
+ @Override
+ public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+ Set<RowId> rowIds = index.getOrDefault(key.byteBuffer(), Set.of());
+
+ return Cursor.fromIterator(rowIds.iterator());
+ }
+
@Override
public void put(IndexRow row) {
index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 4a97e6149f..e734f74565 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -47,7 +47,7 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
protected volatile boolean started;
- protected volatile
AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
+ private volatile
AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
/**
* Constructor.
@@ -64,6 +64,11 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
return tableCfg;
}
+ @Override
+ public TablesConfiguration tablesConfiguration() {
+ return tablesConfiguration;
+ }
+
/**
* Returns a data region instance for the table.
*/
@@ -143,12 +148,24 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID
indexId) {
- return
getOrCreateMvPartition(partitionId).getOrCreateSortedIndex(indexId);
+ AbstractPageMemoryMvPartitionStorage partitionStorage =
getMvPartition(partitionId);
+
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition ID %d does not
exist", partitionId));
+ }
+
+ return partitionStorage.getOrCreateSortedIndex(indexId);
}
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID
indexId) {
- return
getOrCreateMvPartition(partitionId).getOrCreateHashIndex(indexId);
+ AbstractPageMemoryMvPartitionStorage partitionStorage =
getMvPartition(partitionId);
+
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition ID %d does not
exist", partitionId));
+ }
+
+ return partitionStorage.getOrCreateHashIndex(indexId);
}
@Override
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 3294718614..dc17e525be 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -86,12 +86,12 @@ public class PageMemoryHashIndexStorage implements
HashIndexStorage {
IgniteCursor<HashIndexRow> cursor;
try {
- cursor = hashIndexTree.find(lowerBound, upperBound, null);
+ cursor = hashIndexTree.find(lowerBound, upperBound);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Failed to create scan cursor", e);
}
- return Cursor.fromIterator(new TreeCursorAdapter<>(cursor,
HashIndexRow::rowId));
+ return new TreeCursorAdapter<>(cursor, HashIndexRow::rowId);
}
@Override
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index a202b199ab..7946c77853 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.storage.pagememory.index.sorted;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -68,6 +69,23 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
return descriptor;
}
+ @Override
+ public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+ BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
+
+ SortedIndexRowKey prefixKey = toSortedIndexRowKey(prefix);
+
+ IgniteCursor<SortedIndexRow> cursor;
+
+ try {
+ cursor = sortedIndexTree.find(prefixKey, prefixKey);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Failed to create scan cursor", e);
+ }
+
+ return new TreeCursorAdapter<>(cursor, SortedIndexRow::rowId);
+ }
+
@Override
public void put(IndexRow row) {
IndexColumns indexColumns = new IndexColumns(partitionId,
row.indexColumns().byteBuffer());
@@ -118,7 +136,7 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
throw new StorageException("Failed to create scan cursor", e);
}
- return Cursor.fromIterator(new TreeCursorAdapter<>(cursor,
this::toIndexRowImpl));
+ return new TreeCursorAdapter<>(cursor, this::toIndexRowImpl);
}
private @Nullable SortedIndexRowKey toSortedIndexRowKey(@Nullable
BinaryTuplePrefix binaryTuple) {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
index a2eaf2c7ab..fc67539f90 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/util/TreeCursorAdapter.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Function;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteCursor;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -30,7 +31,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
* @param <TREE_ROWT> Type of elements in a tree cursor.
* @param <CURSOR_ROWT> Type of elements in a resulting iterator.
*/
-public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT> implements
Iterator<CURSOR_ROWT> {
+public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT> implements
Cursor<CURSOR_ROWT> {
/** Cursor instance from the tree. */
private final IgniteCursor<TREE_ROWT> cursor;
@@ -74,4 +75,9 @@ public class TreeCursorAdapter<TREE_ROWT, CURSOR_ROWT>
implements Iterator<CURSO
throw new StorageException("Failed to read next element from the
tree", e);
}
}
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
new file mode 100644
index 0000000000..2cc4634e4b
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.nio.file.Path;
+import
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import
org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageConfigurationSchema;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link PersistentPageMemoryTableStorage} class.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class PersistentPageMemoryMvTableStorageTest extends
AbstractMvTableStorageTest {
+ private PersistentPageMemoryStorageEngine engine;
+
+ private MvTableStorage tableStorage;
+
+ @BeforeEach
+ void setUp(
+ @WorkDirectory
+ Path workDir,
+ @InjectConfiguration(polymorphicExtensions =
UnsafeMemoryAllocatorConfigurationSchema.class)
+ PersistentPageMemoryStorageEngineConfiguration engineConfig,
+ @InjectConfiguration(
+ polymorphicExtensions = {
+
PersistentPageMemoryDataStorageConfigurationSchema.class,
+ UnknownDataStorageConfigurationSchema.class,
+ HashIndexConfigurationSchema.class,
+ SortedIndexConfigurationSchema.class,
+ NullValueDefaultConfigurationSchema.class,
+ UnlimitedBudgetConfigurationSchema.class
+ },
+ value = "mock.tables.foo{ partitions = 512,
dataStorage.name = " + PersistentPageMemoryStorageEngine.ENGINE_NAME + "}"
+ )
+ TablesConfiguration tablesConfig
+ ) {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
+
+ engine.start();
+
+ tableStorage = engine.createMvTable(tablesConfig.tables().get("foo"),
tablesConfig);
+
+ tableStorage.start();
+
+ initialize(tableStorage, tablesConfig);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ tableStorage == null ? null : tableStorage::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ // TODO: Enable this test after index destruction is implemented.
+ @Disabled
+ @Override
+ public void testDestroyIndex() {
+ super.testDestroyIndex();
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
new file mode 100644
index 0000000000..be25cc7fd4
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import
org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import
org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageConfigurationSchema;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link VolatilePageMemoryTableStorage}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class VolatilePageMemoryMvTableStorageTest extends
AbstractMvTableStorageTest {
+ private VolatilePageMemoryStorageEngine engine;
+
+ private MvTableStorage tableStorage;
+
+ @BeforeEach
+ void setUp(
+ @InjectConfiguration(polymorphicExtensions =
UnsafeMemoryAllocatorConfigurationSchema.class)
+ VolatilePageMemoryStorageEngineConfiguration engineConfig,
+ @InjectConfiguration(
+ polymorphicExtensions = {
+
VolatilePageMemoryDataStorageConfigurationSchema.class,
+ UnknownDataStorageConfigurationSchema.class,
+ HashIndexConfigurationSchema.class,
+ SortedIndexConfigurationSchema.class,
+ NullValueDefaultConfigurationSchema.class,
+ UnlimitedBudgetConfigurationSchema.class
+ },
+ value = "mock.tables.foo{ partitions = 512,
dataStorage.name = " + VolatilePageMemoryStorageEngine.ENGINE_NAME + "}"
+ )
+ TablesConfiguration tablesConfig
+ ) {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+
+ engine.start();
+
+ tableStorage = engine.createMvTable(tablesConfig.tables().get("foo"),
tablesConfig);
+
+ tableStorage.start();
+
+ initialize(tableStorage, tablesConfig);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ tableStorage == null ? null : tableStorage::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ // TODO: Enable this test after index destruction is implemented.
+ @Disabled
+ @Override
+ public void testDestroyIndex() {
+ super.testDestroyIndex();
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2668d81708..2fcfa0071c 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -179,6 +179,11 @@ public class RocksDbTableStorage implements MvTableStorage
{
return tableCfg;
}
+ @Override
+ public TablesConfiguration tablesConfiguration() {
+ return tablesCfg;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 927be72c10..38dfe81389 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.row.InternalTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
@@ -61,6 +60,10 @@ import org.rocksdb.WriteBatchWithIndex;
* <p>We use an empty array as values, because all required information can be
extracted from the key.
*/
public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+ private static final int ROW_ID_SIZE = Long.BYTES * 2;
+
+ private static final ByteOrder ORDER = ByteOrder.BIG_ENDIAN;
+
private final SortedIndexDescriptor descriptor;
private final ColumnFamily indexCf;
@@ -89,6 +92,13 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
return descriptor;
}
+ @Override
+ public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+ BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
+
+ return map(scan(keyPrefix, keyPrefix, true, true), this::decodeRowId);
+ }
+
@Override
public void put(IndexRow row) {
WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
@@ -116,17 +126,26 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
- return scan(lowerBound, upperBound, includeLower, includeUpper);
+ return map(scan(lowerBound, upperBound, includeLower, includeUpper),
this::decodeRow);
}
- private Cursor<IndexRow> scan(
+ private Cursor<ByteBuffer> scan(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
boolean includeLower,
boolean includeUpper
) {
byte[] lowerBoundBytes = lowerBound == null ? null :
rocksPrefix(lowerBound);
- byte[] upperBoundBytes = upperBound == null ? null :
rocksPrefix(upperBound);
+
+ byte[] upperBoundBytes;
+
+ if (upperBound == null) {
+ upperBoundBytes = null;
+ } else if (lowerBound == upperBound) {
+ upperBoundBytes = lowerBoundBytes;
+ } else {
+ upperBoundBytes = rocksPrefix(upperBound);
+ }
Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes,
upperBoundBytes);
@@ -137,12 +156,15 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
// Include the upper bound, if needed (RocksDB excludes the upper
bound by default).
if (includeUpper && upperBound != null) {
- Cursor<ByteBuffer> upperBoundCursor =
takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+ Cursor<ByteBuffer> upperBoundCursor = takeWhile(
+ createScanCursor(upperBoundBytes, null),
+ startsWith(upperBound)
+ );
cursor = concat(cursor, upperBoundCursor);
}
- return map(cursor, this::decodeRow);
+ return cursor;
}
private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound,
byte @Nullable [] upperBound) {
@@ -161,7 +183,7 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
return new RocksIteratorAdapter<>(it) {
@Override
protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
- return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+ return ByteBuffer.wrap(key).order(ORDER);
}
@Override
@@ -178,35 +200,36 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
var tuple = new BinaryTuple(descriptor.binaryTupleSchema(),
binaryTupleSlice(bytes));
+ return new IndexRowImpl(tuple, decodeRowId(bytes));
+ }
+
+ private RowId decodeRowId(ByteBuffer bytes) {
// RowId UUID is located at the last 16 bytes of the key
long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES *
2);
long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
- var rowId = new RowId(partitionStorage.partitionId(),
mostSignificantBits, leastSignificantBits);
-
- return new IndexRowImpl(tuple, rowId);
+ return new RowId(partitionStorage.partitionId(), mostSignificantBits,
leastSignificantBits);
}
private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
- return rocksPrefix(prefix, 0).array();
- }
-
- private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
- ByteBuffer keyBytes = prefix.byteBuffer();
+ ByteBuffer bytes = prefix.byteBuffer();
- return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() +
extraLength)
- .order(ByteOrder.BIG_ENDIAN)
+ return ByteBuffer.allocate(Short.BYTES + bytes.remaining())
+ .order(ORDER)
.putShort((short) partitionStorage.partitionId())
- .put(keyBytes);
+ .put(bytes)
+ .array();
}
private byte[] rocksKey(IndexRow row) {
- RowId rowId = row.rowId();
+ ByteBuffer bytes = row.indexColumns().byteBuffer();
- // We don't store the Partition ID as it is already a part of the key.
- return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
- .putLong(rowId.mostSignificantBits())
- .putLong(rowId.leastSignificantBits())
+ return ByteBuffer.allocate(Short.BYTES + bytes.remaining() +
ROW_ID_SIZE)
+ .order(ORDER)
+ .putShort((short) partitionStorage.partitionId())
+ .put(bytes)
+ .putLong(row.rowId().mostSignificantBits())
+ .putLong(row.rowId().leastSignificantBits())
.array();
}
@@ -233,7 +256,7 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
// Discard partition ID.
.position(Short.BYTES)
// Discard row ID.
- .limit(key.limit() - Long.BYTES * 2)
+ .limit(key.limit() - ROW_ID_SIZE)
.slice()
.order(ByteOrder.LITTLE_ENDIAN);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 16c3e2773e..dc2c11989a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
@@ -379,7 +378,7 @@ public class TxStateRocksDbStorage implements
TxStateStorage {
throw e;
}
- RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter
= new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+ return new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
@Override protected IgniteBiTuple<UUID, TxMeta>
decodeEntry(byte[] keyBytes, byte[] valueBytes) {
UUID key = bytesToUuid(keyBytes, 0);
TxMeta txMeta = fromBytes(valueBytes);
@@ -399,8 +398,6 @@ public class TxStateRocksDbStorage implements
TxStateStorage {
super.close();
}
};
-
- return Cursor.fromIterator(iteratorAdapter);
} finally {
busyLock.leaveBusy();
}