This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6fd920dd3 IGNITE-16697 POC - MV storage base interfaces and reference
implementation (#739)
6fd920dd3 is described below
commit 6fd920dd31f0a9a1a64bab778e042c939feb680d
Author: ibessonov <[email protected]>
AuthorDate: Tue Apr 5 15:32:39 2022 +0300
IGNITE-16697 POC - MV storage base interfaces and reference implementation
(#739)
---
.../org/apache/ignite/internal/util/Cursor.java | 35 ++
.../client/ItMetaStorageRaftGroupTest.java | 26 +-
modules/storage-api/pom.xml | 5 +
.../internal/storage/MvPartitionStorage.java | 78 +++++
.../internal/storage/TxIdMismatchException.java} | 13 +-
.../internal/storage}/index/PrefixComparator.java | 33 +-
.../storage/index/SortedIndexMvStorage.java | 95 ++++++
.../storage/AbstractMvPartitionStorageTest.java | 233 +++++++++++++
.../storage/AbstractSortedIndexMvStorageTest.java | 371 +++++++++++++++++++++
.../internal/storage/BaseMvStoragesTest.java | 179 ++++++++++
.../storage/basic/TestMvPartitionStorage.java | 176 ++++++++++
.../storage/basic/TestMvPartitionStorageTest.java} | 21 +-
.../storage/basic/TestSortedIndexMvStorage.java | 249 ++++++++++++++
.../basic/TestSortedIndexMvStorageTest.java | 49 +++
.../chm/TestConcurrentHashMapPartitionStorage.java | 28 +-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 1 +
.../vault/inmemory/InMemoryVaultService.java | 22 +-
17 files changed, 1510 insertions(+), 104 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
index e1ddaf2b9..c227c68c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
@@ -25,5 +25,40 @@ import java.util.Iterator;
* @param <T> Type of elements.
*/
public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
+ /** {@inheritDoc} */
+ @Override
+ default Iterator<T> iterator() {
+ return this;
+ }
+ /**
+ * Creates an iterator based cursor.
+ *
+ * @param it Iterator.
+ * @param <T> Type of elements in iterator.
+ * @return Cursor.
+ */
+ static <T> Cursor<T> fromIterator(Iterator<? extends T> it) {
+ return new Cursor<T>() {
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws Exception {
+ if (it instanceof AutoCloseable) {
+ ((AutoCloseable) it).close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T next() {
+ return it.next();
+ }
+ };
+ }
}
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index 3f3a386c9..9f1a76fe8 100644
---
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -63,7 +62,6 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.utils.ClusterServiceTestUtils;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -244,29 +242,7 @@ public class ItMetaStorageRaftGroupTest {
List<org.apache.ignite.internal.metastorage.server.Entry> entries
= new ArrayList<>(
List.of(EXPECTED_SRV_RESULT_ENTRY1,
EXPECTED_SRV_RESULT_ENTRY2));
- return new
Cursor<org.apache.ignite.internal.metastorage.server.Entry>() {
- private final
Iterator<org.apache.ignite.internal.metastorage.server.Entry> it =
entries.iterator();
-
- @Override
- public void close() {
- }
-
- @NotNull
- @Override
- public
Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
- return it;
- }
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public org.apache.ignite.internal.metastorage.server.Entry
next() {
- return it.next();
- }
- };
+ return Cursor.fromIterator(entries.iterator());
});
List<Pair<RaftServer, RaftGroupService>> raftServersRaftGroups =
prepareJraftMetaStorages(replicatorStartedCounter,
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index 951b2b080..b373203cc 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -43,6 +43,11 @@
<artifactId>ignite-schema</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-transactions</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
new file mode 100644
index 000000000..4684e52b5
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multi-versioned partition storage.
+ * POC version, that represents a combination between a replicated TX-aware MV
storage and physical MV storage. Their API are very similar,
+ * although there are very important differences that will be addressed in the
future.
+ */
+public interface MvPartitionStorage {
+ /**
+ * Reads the value from the storage as it was at the given timestamp.
{@code null} timestamp means reading the latest value.
+ *
+ * @param key Key.
+ * @param timestamp Timestamp.
+ * @return Binary row that corresponds to the key or {@code null} if value
is not found.
+ */
+ @Nullable
+ BinaryRow read(BinaryRow key, @Nullable Timestamp timestamp);
+
+ /**
+ * Creates an uncommitted version, assigned to the given transaction id.
+ *
+ * @param row Binary row to update. Key only row means value removal.
+ * @param txId Transaction id.
+ * @throws TxIdMismatchException If there's another pending update
associated with different transaction id.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ void addWrite(BinaryRow row, UUID txId) throws TxIdMismatchException,
StorageException;
+
+ /**
+ * Aborts a pending update of the ongoing uncommitted transaction. Invoked
during rollback.
+ *
+ * @param key Key.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ void abortWrite(BinaryRow key) throws StorageException;
+
+ /**
+ * Commits a pending update of the ongoing transaction. Invoked during
commit. Committed value will be versioned by the given timestamp.
+ *
+ * @param key Key.
+ * @param timestamp Timestamp to associate with committed value.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ void commitWrite(BinaryRow key, Timestamp timestamp) throws
StorageException;
+
+ /**
+ * Scans the partition and returns a cursor of values at the given
timestamp.
+ *
+ * @param keyFilter Key filter. Binary rows passed to the filter may or
may not have a value, filter should only check keys.
+ * @param timestamp Timestamp.
+ * @return Cursor.
+ */
+ Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable Timestamp
timestamp) throws StorageException;
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TxIdMismatchException.java
similarity index 68%
copy from modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
copy to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TxIdMismatchException.java
index e1ddaf2b9..fe8d8799f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TxIdMismatchException.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.storage;
-import java.util.Iterator;
+import org.apache.ignite.lang.IgniteException;
/**
- * Closeable cursor.
- *
- * @param <T> Type of elements.
+ * Exception class that describes the situation when two independent
transactions attempt to write values for the same key.
*/
-public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
-
+public class TxIdMismatchException extends IgniteException {
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PrefixComparator.java
similarity index 85%
rename from
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
rename to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PrefixComparator.java
index 2001d77f3..856acae4d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PrefixComparator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.rocksdb.index;
+package org.apache.ignite.internal.storage.index;
import java.util.Arrays;
import java.util.BitSet;
@@ -23,15 +23,13 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
import org.jetbrains.annotations.Nullable;
/**
* Class for comparing a {@link BinaryRow} representing an Index Key with a
given prefix of index columns.
*/
-class PrefixComparator {
+public class PrefixComparator {
private final SortedIndexDescriptor descriptor;
private final @Nullable Object[] prefix;
@@ -41,7 +39,7 @@ class PrefixComparator {
* @param descriptor Index Descriptor of the enclosing index.
* @param prefix Prefix to compare the incoming rows against.
*/
- PrefixComparator(SortedIndexDescriptor descriptor, IndexRowPrefix prefix) {
+ public PrefixComparator(SortedIndexDescriptor descriptor, IndexRowPrefix
prefix) {
assert descriptor.indexRowColumns().size() >=
prefix.prefixColumnValues().length;
this.descriptor = descriptor;
@@ -56,7 +54,7 @@ class PrefixComparator {
* a value less than {@code 0} if the row's prefix is smaller than
the prefix; and
* a value greater than {@code 0} if the row's prefix is larger
than the prefix.
*/
- int compare(BinaryRow binaryRow) {
+ public int compare(BinaryRow binaryRow) {
var row = new Row(descriptor.asSchemaDescriptor(), binaryRow);
for (int i = 0; i < prefix.length; ++i) {
@@ -76,7 +74,18 @@ class PrefixComparator {
* Compares a particular column of a {@code row} with the given value.
*/
private static int compare(Column column, Row row, @Nullable Object value)
{
- boolean nullRow = row.hasNullValue(column.schemaIndex(),
column.type().spec());
+ int schemaIndex = column.schemaIndex();
+
+ NativeTypeSpec typeSpec = column.type().spec();
+
+ return compareColumns(row, schemaIndex, typeSpec, value);
+ }
+
+ /**
+ * Compares a particular column of a {@code row} with the given value.
+ */
+ public static int compareColumns(Row row, int schemaIndex, NativeTypeSpec
typeSpec, @Nullable Object value) {
+ boolean nullRow = row.hasNullValue(schemaIndex, typeSpec);
if (nullRow && value == null) {
return 0;
@@ -86,10 +95,6 @@ class PrefixComparator {
return 1;
}
- int schemaIndex = column.schemaIndex();
-
- NativeTypeSpec typeSpec = column.type().spec();
-
switch (typeSpec) {
case INT8:
return Byte.compare(row.byteValue(schemaIndex), (Byte) value);
@@ -127,11 +132,7 @@ class PrefixComparator {
return ((Comparable) typeSpec.objectValue(row,
schemaIndex)).compareTo(value);
default:
- // should never reach here, this invariant is checked during
the index creation
- throw new IllegalStateException(String.format(
- "Invalid column schema. Column name: %s, column type:
%s",
- column.name(), column.type()
- ));
+ throw new AssertionError("Unknown type spec: " + typeSpec);
}
}
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexMvStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexMvStorage.java
new file mode 100644
index 000000000..305058717
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexMvStorage.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import java.util.function.IntPredicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.intellij.lang.annotations.MagicConstant;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage for a sorted index.
+ * POC version, that represents a combination between a replicated TX-aware MV
storage and physical MV storage. Real future implementation
+ * will be defined later. Things to notice here: TX-aware implementation
should have a projection bitset instead of full row reading.
+ * Physical storage API will be enriched with append/remove methods, like in
reference implementation.
+ */
+public interface SortedIndexMvStorage {
+ /** Exclude lower bound. */
+ int GREATER = 0;
+
+ /** Include lower bound. */
+ int GREATER_OR_EQUAL = 1;
+
+ /** Exclude upper bound. */
+ int LESS = 0;
+
+ /** Include upper bound. */
+ int LESS_OR_EQUAL = 1 << 1;
+
+ /** Forward scan. */
+ int FORWARD = 0;
+
+ /** Backwards scan. */
+ int BACKWARDS = 1 << 2;
+
+ /**
+ * The sole purpose of this class is to avoid massive refactoring while
changing the original IndexRow.
+ */
+ interface IndexRowEx {
+ /**
+ * Key-only binary row if index-only scan is supported, full binary
row otherwise.
+ */
+ BinaryRow row();
+
+ /**
+ * Returns indexed column value.
+ *
+ * @param idx PK column index.
+ * @return Indexed column value.
+ */
+ Object value(int idx);
+ }
+
+ boolean supportsBackwardsScan();
+
+ boolean supportsIndexOnlyScan();
+
+ /**
+ * Returns a range of index values between the lower bound and the upper
bound, consistent with the passed timestamp.
+ *
+ * @param lowerBound Lower bound. Exclusivity is controlled by a {@link
#GREATER_OR_EQUAL} or {@link #GREATER} flag.
+ * {@code null} means unbounded.
+ * @param upperBound Upper bound. Exclusivity is controlled by a {@link
#LESS} or {@link #LESS_OR_EQUAL} flag.
+ * {@code null} means unbounded.
+ * @param flags Control flags. {@link #GREATER} | {@link #LESS} | {@link
#FORWARD} by default. Other available values
+ * are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL} and {@link
#BACKWARDS}.
+ * @param timestamp Timestamp value for consistent multi-versioned index
scan.
+ * @param partitionFilter Partition filter predicate. {@code null} means
returning data from all partitions.
+ * @return Cursor with fetched index rows.
+ * @throws IllegalArgumentException If backwards flag is passed and
backwards iteration is not supported by the storage.
+ */
+ Cursor<IndexRowEx> scan(
+ @Nullable IndexRowPrefix lowerBound,
+ @Nullable IndexRowPrefix upperBound,
+ @MagicConstant(flagsFromClass = SortedIndexStorage.class) int
flags,
+ Timestamp timestamp,
+ @Nullable IntPredicate partitionFilter
+ );
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
new file mode 100644
index 000000000..4b1c3aad9
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Base test for MV partition storages.
+ */
+public abstract class AbstractMvPartitionStorageTest extends
BaseMvStoragesTest {
+ /**
+ * Creates a storage instance for testing.
+ */
+ protected abstract MvPartitionStorage partitionStorage();
+
+ /**
+ * Tests that reads and scan from empty storage return empty results.
+ */
+ @Test
+ public void testEmpty() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ BinaryRow binaryKey = binaryKey(new TestKey(10, "foo"));
+
+ // Read.
+ assertNull(pk.read(binaryKey, null));
+ assertNull(pk.read(binaryKey, Timestamp.nextVersion()));
+
+ // Scan.
+ assertEquals(List.of(), convert(pk.scan(row -> true, null)));
+ assertEquals(List.of(), convert(pk.scan(row -> true,
Timestamp.nextVersion())));
+ }
+
+ /**
+ * Tests basic invariants of {@link MvPartitionStorage#addWrite(BinaryRow,
UUID)}.
+ */
+ @Test
+ public void testAddWrite() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ TestKey key = new TestKey(10, "foo");
+ TestValue value = new TestValue(20, "bar");
+
+ BinaryRow binaryRow = binaryRow(key, value);
+
+ pk.addWrite(binaryRow, UUID.randomUUID());
+
+ // Attempt to write from another transaction.
+ assertThrows(TxIdMismatchException.class, () -> pk.addWrite(binaryRow,
UUID.randomUUID()));
+
+ // Read without timestamp returns uncommited row.
+ assertEquals(value, value(pk.read(binaryKey(key), null)));
+
+ // Read with timestamp returns null.
+ assertNull(pk.read(binaryKey(key), Timestamp.nextVersion()));
+ }
+
+ /**
+ * Tests basic invariants of {@link
MvPartitionStorage#abortWrite(BinaryRow)}.
+ */
+ @Test
+ public void testAbortWrite() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ TestKey key = new TestKey(10, "foo");
+ TestValue value = new TestValue(20, "bar");
+
+ pk.addWrite(binaryRow(key, value), UUID.randomUUID());
+
+ pk.abortWrite(binaryKey(key));
+
+ // Aborted row can't be read.
+ assertNull(pk.read(binaryKey(key), null));
+ }
+
+ /**
+ * Tests basic invariants of {@link
MvPartitionStorage#commitWrite(BinaryRow, Timestamp)}.
+ */
+ @Test
+ public void testCommitWrite() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ TestKey key = new TestKey(10, "foo");
+ TestValue value = new TestValue(20, "bar");
+
+ BinaryRow binaryRow = binaryRow(key, value);
+
+ pk.addWrite(binaryRow, UUID.randomUUID());
+
+ Timestamp tsBefore = Timestamp.nextVersion();
+
+ Timestamp tsExact = Timestamp.nextVersion();
+ pk.commitWrite(binaryRow, tsExact);
+
+ Timestamp tsAfter = Timestamp.nextVersion();
+
+ // Row is invisible at the time before writing.
+ assertNull(pk.read(binaryRow, tsBefore));
+
+ // Row is valid at the time during and after writing.
+ assertEquals(value, value(pk.read(binaryRow, null)));
+ assertEquals(value, value(pk.read(binaryRow, tsExact)));
+ assertEquals(value, value(pk.read(binaryRow, tsAfter)));
+
+ TestValue newValue = new TestValue(30, "duh");
+
+ pk.addWrite(binaryRow(key, newValue), UUID.randomUUID());
+
+ // Same checks, but now there are two different versions.
+ assertNull(pk.read(binaryRow, tsBefore));
+
+ assertEquals(newValue, value(pk.read(binaryRow, null)));
+
+ assertEquals(value, value(pk.read(binaryRow, tsExact)));
+ assertEquals(value, value(pk.read(binaryRow, tsAfter)));
+ assertEquals(value, value(pk.read(binaryRow,
Timestamp.nextVersion())));
+
+ // Only latest time behavior changes after commit.
+ pk.commitWrite(binaryKey(key), Timestamp.nextVersion());
+
+ assertEquals(newValue, value(pk.read(binaryRow, null)));
+
+ assertEquals(value, value(pk.read(binaryRow, tsExact)));
+ assertEquals(value, value(pk.read(binaryRow, tsAfter)));
+
+ assertEquals(newValue, value(pk.read(binaryRow,
Timestamp.nextVersion())));
+
+ // Remove.
+ pk.addWrite(binaryKey(key), UUID.randomUUID());
+
+ assertNull(pk.read(binaryRow, tsBefore));
+
+ assertNull(pk.read(binaryRow, null));
+
+ assertEquals(value, value(pk.read(binaryRow, tsExact)));
+ assertEquals(value, value(pk.read(binaryRow, tsAfter)));
+
+ assertEquals(newValue, value(pk.read(binaryRow,
Timestamp.nextVersion())));
+
+ // Commit remove.
+ Timestamp removeTs = Timestamp.nextVersion();
+ pk.commitWrite(binaryKey(key), removeTs);
+
+ assertNull(pk.read(binaryRow, tsBefore));
+
+ assertNull(pk.read(binaryRow, null));
+ assertNull(pk.read(binaryRow, removeTs));
+ assertNull(pk.read(binaryRow, Timestamp.nextVersion()));
+
+ assertEquals(value, value(pk.read(binaryRow, tsExact)));
+ assertEquals(value, value(pk.read(binaryRow, tsAfter)));
+ }
+
+ /**
+ * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate,
Timestamp)}.
+ */
+ @Test
+ public void testScan() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ TestKey key1 = new TestKey(1, "1");
+ TestValue value1 = new TestValue(10, "xxx");
+
+ TestKey key2 = new TestKey(2, "2");
+ TestValue value2 = new TestValue(20, "yyy");
+
+ pk.addWrite(binaryRow(key1, value1), UUID.randomUUID());
+ pk.addWrite(binaryRow(key2, value2), UUID.randomUUID());
+
+ // Scan with and without filters.
+ assertEquals(List.of(value1, value2), convert(pk.scan(row -> true,
null)));
+ assertEquals(List.of(value1), convert(pk.scan(row -> key(row).intKey
== 1, null)));
+ assertEquals(List.of(value2), convert(pk.scan(row -> key(row).intKey
== 2, null)));
+
+ Timestamp ts1 = Timestamp.nextVersion();
+
+ Timestamp ts2 = Timestamp.nextVersion();
+ pk.commitWrite(binaryKey(key1), ts2);
+
+ Timestamp ts3 = Timestamp.nextVersion();
+
+ Timestamp ts4 = Timestamp.nextVersion();
+ pk.commitWrite(binaryKey(key2), ts4);
+
+ Timestamp ts5 = Timestamp.nextVersion();
+
+ // Full scan with various timestamp values.
+ assertEquals(List.of(), convert(pk.scan(row -> true, ts1)));
+
+ assertEquals(List.of(value1), convert(pk.scan(row -> true, ts2)));
+ assertEquals(List.of(value1), convert(pk.scan(row -> true, ts3)));
+
+ assertEquals(List.of(value1, value2), convert(pk.scan(row -> true,
ts4)));
+ assertEquals(List.of(value1, value2), convert(pk.scan(row -> true,
ts5)));
+ }
+
+ private List<TestValue> convert(Cursor<BinaryRow> cursor) throws Exception
{
+ try (cursor) {
+ return StreamSupport.stream(cursor.spliterator(), false)
+ .map(BaseMvStoragesTest::value)
+ .sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
+ .collect(toList());
+ }
+ }
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractSortedIndexMvStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractSortedIndexMvStorageTest.java
new file mode 100644
index 000000000..390ffbc87
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractSortedIndexMvStorageTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.BACKWARDS;
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.FORWARD;
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.GREATER;
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.GREATER_OR_EQUAL;
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.LESS;
+import static
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.LESS_OR_EQUAL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.configuration.schemas.table.ColumnChange;
+import org.apache.ignite.configuration.schemas.table.SortedIndexChange;
+import
org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.row.Row;
+import
org.apache.ignite.internal.storage.chm.TestConcurrentHashMapStorageEngine;
+import
org.apache.ignite.internal.storage.chm.schema.TestConcurrentHashMapDataStorageConfigurationSchema;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.SortedIndexMvStorage;
+import
org.apache.ignite.internal.storage.index.SortedIndexMvStorage.IndexRowEx;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base test for MV index storages.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public abstract class AbstractSortedIndexMvStorageTest extends
BaseMvStoragesTest {
+ protected static final String INDEX1 = "asc_asc";
+ protected static final String INDEX2 = "asc_desc";
+
+ protected TableConfiguration tableCfg;
+
+ @BeforeEach
+ void setUp(@InjectConfiguration(
+ polymorphicExtensions = {SortedIndexConfigurationSchema.class,
TestConcurrentHashMapDataStorageConfigurationSchema.class},
+ // This value only required for configuration validity, it's not
used otherwise.
+ value = "mock.dataStorage.name = " +
TestConcurrentHashMapStorageEngine.ENGINE_NAME
+ ) TableConfiguration tableCfg) {
+ tableCfg.change(tableChange -> tableChange
+ .changePartitions(1)
+ .changePrimaryKey(pk -> pk.changeColumns("intKey", "strKey"))
+ .changeColumns(columns -> columns
+ .create("intKey", column("intKey", "INT32"))
+ .create("strKey", column("strKey", "STRING"))
+ .create("intVal", column("intVal", "INT32"))
+ .create("strVal", column("strVal", "STRING"))
+ )
+ .changeIndices(indexes -> indexes
+ .create(INDEX1, idx ->
idx.convert(SortedIndexChange.class).changeColumns(idxColumns -> idxColumns
+ .create("strVal", c ->
c.changeName("strVal").changeAsc(true))
+ .create("intVal", c ->
c.changeName("intVal").changeAsc(true))
+ ))
+ .create(INDEX2, idx ->
idx.convert(SortedIndexChange.class).changeColumns(idxColumns -> idxColumns
+ .create("strVal", c ->
c.changeName("strVal").changeAsc(true))
+ .create("intVal", c ->
c.changeName("intVal").changeAsc(false))
+ ))
+ )
+ ).join();
+
+ this.tableCfg = tableCfg;
+ }
+
+ private static Consumer<ColumnChange> column(String name, String typeName)
{
+ return c -> c.changeName(name).changeNullable(false).changeType(type
-> type.changeType(typeName));
+ }
+
+ /**
+ * Creates a storage instance for testing.
+ */
+ protected abstract MvPartitionStorage partitionStorage();
+
+ /**
+ * Creates a storage instanc efor testing.
+ */
+ protected abstract SortedIndexMvStorage createIndexStorage(String name,
TableView tableCfg);
+
+ @Test
+ public void testEmpty() throws Exception {
+ SortedIndexMvStorage index1 = createIndexStorage(INDEX1,
tableCfg.value());
+ SortedIndexMvStorage index2 = createIndexStorage(INDEX2,
tableCfg.value());
+
+ assertEquals(List.of(), convert(index1.scan(null, null, (byte) 0,
null, null)));
+ assertEquals(List.of(), convert(index2.scan(null, null, (byte) 0,
null, null)));
+ }
+
+ @Test
+ public void testBoundsAndOrder() throws Exception {
+ SortedIndexMvStorage index1 = createIndexStorage(INDEX1,
tableCfg.value());
+ SortedIndexMvStorage index2 = createIndexStorage(INDEX2,
tableCfg.value());
+
+ TestValue val9010 = new TestValue(90, "10");
+ TestValue val8010 = new TestValue(80, "10");
+ TestValue val9020 = new TestValue(90, "20");
+ TestValue val8020 = new TestValue(80, "20");
+
+ insert(new TestKey(1, "1"), val9010, null);
+ insert(new TestKey(2, "2"), val8010, null);
+ insert(new TestKey(3, "3"), val9020, null);
+ insert(new TestKey(4, "4"), val8020, null);
+
+ // Test without bounds.
+ assertEquals(List.of(val8010, val9010, val8020, val9020),
convert(index1.scan(
+ null, null, FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val9020, val8020, val9010, val8010),
convert(index1.scan(
+ null, null, BACKWARDS, null, null
+ )));
+
+ assertEquals(List.of(val9010, val8010, val9020, val8020),
convert(index2.scan(
+ null, null, FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val8020, val9020, val8010, val9010),
convert(index2.scan(
+ null, null, BACKWARDS, null, null
+ )));
+
+ // Lower bound exclusive.
+ assertEquals(List.of(val8020, val9020), convert(index1.scan(
+ prefix("10"), null, GREATER | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val9020, val8020), convert(index1.scan(
+ prefix("10"), null, GREATER | BACKWARDS, null, null
+ )));
+
+ assertEquals(List.of(val9020, val8020), convert(index2.scan(
+ prefix("10"), null, GREATER | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val8020, val9020), convert(index2.scan(
+ prefix("10"), null, GREATER | BACKWARDS, null, null
+ )));
+
+ // Lower bound inclusive.
+ assertEquals(List.of(val8010, val9010, val8020, val9020),
convert(index1.scan(
+ prefix("10"), null, GREATER_OR_EQUAL | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val9020, val8020, val9010, val8010),
convert(index1.scan(
+ prefix("10"), null, GREATER_OR_EQUAL | BACKWARDS, null, null
+ )));
+
+ assertEquals(List.of(val9010, val8010, val9020, val8020),
convert(index2.scan(
+ prefix("10"), null, GREATER_OR_EQUAL | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val8020, val9020, val8010, val9010),
convert(index2.scan(
+ prefix("10"), null, GREATER_OR_EQUAL | BACKWARDS, null, null
+ )));
+
+ // Upper bound exclusive.
+ assertEquals(List.of(val8010, val9010), convert(index1.scan(
+ null, prefix("20"), LESS | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val9010, val8010), convert(index1.scan(
+ null, prefix("20"), LESS | BACKWARDS, null, null
+ )));
+
+ assertEquals(List.of(val9010, val8010), convert(index2.scan(
+ null, prefix("20"), LESS | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val8010, val9010), convert(index2.scan(
+ null, prefix("20"), LESS | BACKWARDS, null, null
+ )));
+
+ // Upper bound inclusive.
+ assertEquals(List.of(val8010, val9010, val8020, val9020),
convert(index1.scan(
+ null, prefix("20"), LESS_OR_EQUAL | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val9020, val8020, val9010, val8010),
convert(index1.scan(
+ null, prefix("20"), LESS_OR_EQUAL | BACKWARDS, null, null
+ )));
+
+ assertEquals(List.of(val9010, val8010, val9020, val8020),
convert(index2.scan(
+ null, prefix("20"), LESS_OR_EQUAL | FORWARD, null, null
+ )));
+
+ assertEquals(List.of(val8020, val9020, val8010, val9010),
convert(index2.scan(
+ null, prefix("20"), LESS_OR_EQUAL | BACKWARDS, null, null
+ )));
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ SortedIndexMvStorage index = createIndexStorage(INDEX1,
tableCfg.value());
+
+ TestKey key = new TestKey(1, "1");
+ TestValue val = new TestValue(10, "10");
+
+ pk.addWrite(binaryRow(key, val), UUID.randomUUID());
+
+ // Timestamp is null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+
+ // Abort write.
+ pk.abortWrite(binaryKey(key));
+
+ // Timestamp is null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+ }
+
+ @Test
+ public void testAbortRemove() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ SortedIndexMvStorage index = createIndexStorage(INDEX1,
tableCfg.value());
+
+ TestKey key = new TestKey(1, "1");
+ TestValue val = new TestValue(10, "10");
+
+ Timestamp insertTs = Timestamp.nextVersion();
+ insert(key, val, insertTs);
+
+ // Remove.
+ pk.addWrite(binaryKey(key), UUID.randomUUID());
+
+ // Timestamp is null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, insertTs,
null)));
+ assertEquals(List.of(val), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+
+ // Abort remove.
+ pk.abortWrite(binaryKey(key));
+
+ // Timestamp is null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, insertTs,
null)));
+ assertEquals(List.of(val), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ SortedIndexMvStorage index = createIndexStorage(INDEX1,
tableCfg.value());
+
+ TestKey key = new TestKey(1, "1");
+ TestValue val = new TestValue(10, "10");
+
+ pk.addWrite(binaryRow(key, val), UUID.randomUUID());
+
+ // Timestamp is null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+
+ // Commit write.
+ Timestamp commitTs = Timestamp.nextVersion();
+ pk.commitWrite(binaryKey(key), commitTs);
+
+ // Timestamp is null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, commitTs,
null)));
+ assertEquals(List.of(val), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+ }
+
+ @Test
+ public void testCommitRemove() throws Exception {
+ MvPartitionStorage pk = partitionStorage();
+
+ SortedIndexMvStorage index = createIndexStorage(INDEX1,
tableCfg.value());
+
+ TestKey key = new TestKey(1, "1");
+ TestValue val = new TestValue(10, "10");
+
+ Timestamp insertTs = Timestamp.nextVersion();
+ insert(key, val, insertTs);
+
+ // Remove.
+ pk.addWrite(binaryKey(key), UUID.randomUUID());
+
+ // Timestamp is null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, insertTs,
null)));
+ assertEquals(List.of(val), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+
+ // Commit remove.
+ Timestamp removeTs = Timestamp.nextVersion();
+ pk.commitWrite(binaryKey(key), removeTs);
+
+ // Timestamp is null.
+ assertEquals(List.of(), convert(index.scan(null, null, 0, null,
null)));
+
+ // Timestamp is not null.
+ assertEquals(List.of(val), convert(index.scan(null, null, 0, insertTs,
null)));
+
+ assertEquals(List.of(), convert(index.scan(null, null, 0, removeTs,
null)));
+ assertEquals(List.of(), convert(index.scan(null, null, 0,
Timestamp.nextVersion(), null)));
+ }
+
+ protected void insert(TestKey key, TestValue value, Timestamp ts) {
+ MvPartitionStorage pk = partitionStorage();
+
+ BinaryRow binaryRow = binaryRow(key, value);
+
+ pk.addWrite(binaryRow, UUID.randomUUID());
+
+ pk.commitWrite(binaryRow, ts == null ? Timestamp.nextVersion() : ts);
+ }
+
+ protected IndexRowPrefix prefix(String val) {
+ return () -> new Object[]{val};
+ }
+
+ protected List<TestValue> convert(Cursor<IndexRowEx> cursor) throws
Exception {
+ try (cursor) {
+ return StreamSupport.stream(cursor.spliterator(), false)
+ .map(indexRowEx -> {
+ try {
+ return kvMarshaller.unmarshalValue(new
Row(schemaDescriptor, indexRowEx.row()));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+ }
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
new file mode 100644
index 000000000..0cf9731f1
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.Locale;
+import java.util.Objects;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * Base test for MV storages, contains pojo classes, their descriptor and a
marshaller instance.
+ */
+public abstract class BaseMvStoragesTest {
+ /** Default reflection marshaller factory. */
+ protected static MarshallerFactory marshallerFactory;
+
+ /** Schema descriptor for tests. */
+ protected static SchemaDescriptor schemaDescriptor;
+
+ /** Key-value marshaller for tests. */
+ protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+
+ @BeforeAll
+ static void beforeAll() {
+ marshallerFactory = new ReflectionMarshallerFactory();
+
+ schemaDescriptor = new SchemaDescriptor(1, new Column[]{
+ new Column("intKey".toUpperCase(Locale.ROOT),
NativeTypes.INT32, false),
+ new Column("strKey".toUpperCase(Locale.ROOT),
NativeTypes.STRING, false),
+ }, new Column[]{
+ new Column("intVal".toUpperCase(Locale.ROOT),
NativeTypes.INT32, false),
+ new Column("strVal".toUpperCase(Locale.ROOT),
NativeTypes.STRING, false),
+ });
+
+ kvMarshaller = marshallerFactory.create(schemaDescriptor,
TestKey.class, TestValue.class);
+ }
+
+ @AfterAll
+ static void afterAll() {
+ kvMarshaller = null;
+ schemaDescriptor = null;
+ marshallerFactory = null;
+ }
+
+ protected static BinaryRow binaryKey(TestKey key) {
+ try {
+ return kvMarshaller.marshal(key);
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static BinaryRow binaryRow(TestKey key, TestValue value) {
+ try {
+ return kvMarshaller.marshal(key, value);
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ @Nullable
+ protected static TestKey key(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalKey(new Row(schemaDescriptor,
binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ @Nullable
+ protected static TestValue value(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalValue(new Row(schemaDescriptor,
binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Test pojo key.
+ */
+ protected static class TestKey {
+ public int intKey;
+
+ public String strKey;
+
+ public TestKey() {
+ }
+
+ public TestKey(int intKey, String strKey) {
+ this.intKey = intKey;
+ this.strKey = strKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestKey testKey = (TestKey) o;
+ return intKey == testKey.intKey && Objects.equals(strKey,
testKey.strKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(intKey, strKey);
+ }
+ }
+
+ /**
+ * Test pojo value.
+ */
+ protected static class TestValue implements Comparable<TestValue> {
+ public Integer intVal;
+
+ public String strVal;
+
+ public TestValue() {
+ }
+
+ public TestValue(Integer intVal, String strVal) {
+ this.intVal = intVal;
+ this.strVal = strVal;
+ }
+
+ @Override
+ public int compareTo(TestValue o) {
+ int cmp = Integer.compare(intVal, o.intVal);
+
+ return cmp != 0 ? cmp : strVal.compareTo(o.strVal);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestValue testValue = (TestValue) o;
+ return Objects.equals(intVal, testValue.intVal) &&
Objects.equals(strVal, testValue.strVal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(intVal, strVal);
+ }
+ }
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
new file mode 100644
index 000000000..9c5530e0d
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of MV partition storage.
+ */
+public class TestMvPartitionStorage implements MvPartitionStorage {
+ private final ConcurrentMap<ByteBuffer, VersionChain> map = new
ConcurrentHashMap<>();
+
+ private final List<TestSortedIndexMvStorage> indexes;
+
+ public TestMvPartitionStorage(List<TestSortedIndexMvStorage> indexes) {
+ this.indexes = indexes;
+ }
+
+ private static class VersionChain {
+ final @Nullable BinaryRow row;
+ final @Nullable Timestamp begin;
+ final @Nullable UUID txId;
+ final @Nullable VersionChain next;
+
+ VersionChain(@Nullable BinaryRow row, @Nullable Timestamp begin,
@Nullable UUID txId, @Nullable VersionChain next) {
+ this.row = row;
+ this.begin = begin;
+ this.txId = txId;
+ this.next = next;
+ }
+
+ public static VersionChain createUncommitted(BinaryRow row, UUID txId,
VersionChain next) {
+ return new VersionChain(row, null, txId, next);
+ }
+
+ public static VersionChain createCommitted(Timestamp timestamp,
VersionChain uncommittedVersionChain) {
+ return new VersionChain(uncommittedVersionChain.row, timestamp,
null, uncommittedVersionChain.next);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void addWrite(BinaryRow row, UUID txId) throws
TxIdMismatchException {
+ map.compute(row.keySlice(), (keyBuf, versionChain) -> {
+ if (versionChain != null && versionChain.begin == null &&
!txId.equals(versionChain.txId)) {
+ throw new TxIdMismatchException();
+ }
+
+ return VersionChain.createUncommitted(row, txId, versionChain);
+ });
+
+ if (row.hasValue()) {
+ for (TestSortedIndexMvStorage index : indexes) {
+ index.append(row);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void abortWrite(BinaryRow key) {
+ map.computeIfPresent(key.keySlice(), (ignored, versionChain) -> {
+ assert versionChain != null;
+ assert versionChain.begin == null && versionChain.txId != null;
+
+ BinaryRow aborted = versionChain.row;
+
+ if (aborted.hasValue()) {
+ for (TestSortedIndexMvStorage index : indexes) {
+ abortWrite(versionChain.next, aborted, index);
+ }
+ }
+
+ return versionChain.next;
+ });
+ }
+
+ private void abortWrite(VersionChain head, BinaryRow aborted,
TestSortedIndexMvStorage index) {
+ for (VersionChain cur = head; cur != null; cur = cur.next) {
+ if (index.matches(aborted, cur.row)) {
+ return;
+ }
+ }
+
+ index.remove(aborted);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void commitWrite(BinaryRow key, Timestamp timestamp) {
+ map.compute(key.keySlice(), (keyBuf, versionChain) -> {
+ assert versionChain != null;
+ assert versionChain.begin == null && versionChain.txId != null;
+
+ return VersionChain.createCommitted(timestamp, versionChain);
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @Nullable
+ public BinaryRow read(BinaryRow key, @Nullable Timestamp timestamp) {
+ VersionChain versionChain = map.get(key.keySlice());
+
+ return read(versionChain, timestamp);
+ }
+
+ @Nullable
+ private BinaryRow read(VersionChain versionChain, @Nullable Timestamp
timestamp) {
+ if (versionChain == null) {
+ return null;
+ }
+
+ if (timestamp == null) {
+ return versionChain.row.hasValue() ? versionChain.row : null;
+ }
+
+ VersionChain cur = versionChain;
+
+ if (cur.begin == null) {
+ cur = cur.next;
+ }
+
+ while (cur != null) {
+ if (timestamp.compareTo(cur.begin) >= 0) {
+ BinaryRow row = cur.row;
+
+ return row.hasValue() ? row : null;
+ }
+
+ cur = cur.next;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable
Timestamp timestamp) {
+ Iterator<BinaryRow> iterator = map.values().stream()
+ .map(versionChain -> read(versionChain, timestamp))
+ .filter(Objects::nonNull)
+ .filter(keyFilter)
+ .iterator();
+
+ return Cursor.fromIterator(iterator);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
similarity index 51%
copy from modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
copy to
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
index e1ddaf2b9..9c384d703 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
@@ -15,15 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.storage.basic;
-import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
/**
- * Closeable cursor.
- *
- * @param <T> Type of elements.
+ * MV partition storage test implementation for {@link TestMvPartitionStorage}
class.
*/
-public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
+public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest
{
+ /** Test partition storage instance. */
+ private final TestMvPartitionStorage storage = new
TestMvPartitionStorage(List.of());
+ /** {@inheritDoc} */
+ @Override
+ protected MvPartitionStorage partitionStorage() {
+ return storage;
+ }
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorage.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorage.java
new file mode 100644
index 000000000..6a2f6f2b9
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorage.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.IntPredicate;
+import java.util.function.ToIntFunction;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.IndexColumnView;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import
org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.PrefixComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexMvStorage;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test implementation of MV sorted index storage.
+ */
+public class TestSortedIndexMvStorage implements SortedIndexMvStorage {
+ private final NavigableSet<BinaryRow> index;
+
+ private final SchemaDescriptor descriptor;
+
+ private final Map<Integer, TestMvPartitionStorage> pk;
+
+ private final int partitions;
+
+ private final IndexColumnView[] indexColumns;
+
+ private final int[] columnIndexes;
+
+ private final NativeType[] nativeTypes;
+
+ /**
+ * Constructor.
+ */
+ public TestSortedIndexMvStorage(
+ String name,
+ TableView tableCfg,
+ SchemaDescriptor descriptor,
+ Map<Integer, TestMvPartitionStorage> pk
+ ) {
+ this.descriptor = descriptor;
+
+ this.pk = pk;
+
+ partitions = tableCfg.partitions();
+
+ index = new ConcurrentSkipListSet<>(((Comparator<BinaryRow>)
this::compareColumns).thenComparing(BinaryRow::keySlice));
+
+ // Init columns.
+ NamedListView<? extends ColumnView> tblColumns = tableCfg.columns();
+
+ TableIndexView idxCfg = tableCfg.indices().get(name);
+
+ assert idxCfg instanceof SortedIndexView;
+
+ SortedIndexView sortedIdxCfg = (SortedIndexView) idxCfg;
+
+ NamedListView<? extends IndexColumnView> columns =
sortedIdxCfg.columns();
+
+ int length = columns.size();
+
+ this.indexColumns = new IndexColumnView[length];
+ this.columnIndexes = new int[length];
+ this.nativeTypes = new NativeType[length];
+
+ for (int i = 0; i < length; i++) {
+ IndexColumnView idxColumn = columns.get(i);
+
+ indexColumns[i] = idxColumn;
+
+ int columnIndex =
tblColumns.namedListKeys().indexOf(idxColumn.name());
+
+ columnIndexes[i] = columnIndex;
+
+ nativeTypes[i] =
SchemaDescriptorConverter.convert(SchemaConfigurationConverter.convert(tblColumns.get(columnIndex).type()));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean supportsBackwardsScan() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean supportsIndexOnlyScan() {
+ return false;
+ }
+
+ private int compareColumns(BinaryRow l, BinaryRow r) {
+ Row leftRow = new Row(descriptor, l);
+ Row rightRow = new Row(descriptor, r);
+
+ for (int i = 0; i < indexColumns.length; i++) {
+ int columnIndex = columnIndexes[i];
+
+ int cmp = PrefixComparator.compareColumns(leftRow, columnIndex,
nativeTypes[i].spec(), rightRow.value(columnIndex));
+
+ if (cmp != 0) {
+ return indexColumns[i].asc() ? cmp : -cmp;
+ }
+ }
+
+ return 0;
+ }
+
+ public void append(BinaryRow row) {
+ index.add(row);
+ }
+
+ public void remove(BinaryRow row) {
+ index.remove(row);
+ }
+
+ public boolean matches(BinaryRow aborted, BinaryRow existing) {
+ return existing != null && compareColumns(aborted, existing) == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<IndexRowEx> scan(
+ @Nullable IndexRowPrefix lowerBound,
+ @Nullable IndexRowPrefix upperBound,
+ int flags,
+ Timestamp timestamp,
+ @Nullable IntPredicate partitionFilter
+ ) {
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ NavigableSet<BinaryRow> index = this.index;
+ int direction = 1;
+
+ // Swap bounds and flip index for backwards scan.
+ if ((flags & BACKWARDS) != 0) {
+ index = index.descendingSet();
+ direction = -1;
+
+ boolean tempBoolean = includeLower;
+ includeLower = includeUpper;
+ includeUpper = tempBoolean;
+
+ IndexRowPrefix tempBound = lowerBound;
+ lowerBound = upperBound;
+ upperBound = tempBound;
+ }
+
+ ToIntFunction<BinaryRow> lowerCmp = lowerBound == null ? row -> 1 :
boundComparator(lowerBound, direction, includeLower ? 0 : -1);
+ ToIntFunction<BinaryRow> upperCmp = upperBound == null ? row -> -1 :
boundComparator(upperBound, direction, includeUpper ? 0 : 1);
+
+ Iterator<IndexRowEx> iterator = index.stream()
+ .dropWhile(binaryRow -> lowerCmp.applyAsInt(binaryRow) < 0)
+ .takeWhile(binaryRow -> upperCmp.applyAsInt(binaryRow) <= 0)
+ .map(binaryRow -> {
+ int partition = binaryRow.hash() % partitions;
+
+ if (partition < 0) {
+ partition = -partition;
+ }
+
+ if (partitionFilter != null &&
!partitionFilter.test(partition)) {
+ return null;
+ }
+
+ TestMvPartitionStorage partitionStorage =
pk.get(partition);
+
+ if (partitionStorage == null) {
+ return null;
+ }
+
+ BinaryRow pk = partitionStorage.read(binaryRow, timestamp);
+
+ return matches(binaryRow, pk) ? pk : null;
+ })
+ .filter(Objects::nonNull)
+ .map(binaryRow -> {
+ Row row = new Row(descriptor, binaryRow);
+
+ return (IndexRowEx) new IndexRowEx() {
+ @Override
+ public BinaryRow row() {
+ return binaryRow;
+ }
+
+ @Override
+ public Object value(int idx) {
+ return row.value(columnIndexes[idx]);
+ }
+ };
+ })
+ .iterator();
+
+ return Cursor.fromIterator(iterator);
+ }
+
+ private ToIntFunction<BinaryRow> boundComparator(IndexRowPrefix bound, int
direction, int equals) {
+ return binaryRow -> {
+ Object[] values = bound.prefixColumnValues();
+
+ Row row = new Row(descriptor, binaryRow);
+
+ for (int i = 0; i < values.length; i++) {
+ int columnIndex = columnIndexes[i];
+
+ int cmp = PrefixComparator.compareColumns(row, columnIndex,
nativeTypes[i].spec(), values[i]);
+
+ if (cmp != 0) {
+ return direction * (indexColumns[i].asc() ? cmp : -cmp);
+ }
+ }
+
+ return equals;
+ };
+ }
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorageTest.java
new file mode 100644
index 000000000..6be39d55e
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestSortedIndexMvStorageTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.storage.AbstractSortedIndexMvStorageTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexMvStorage;
+
+/**
+ * MV sorted index storage test implementation for {@link
TestSortedIndexMvStorage} class.
+ */
+public class TestSortedIndexMvStorageTest extends
AbstractSortedIndexMvStorageTest {
+ private List<TestSortedIndexMvStorage> indexes = new
CopyOnWriteArrayList<>();
+
+ private TestMvPartitionStorage partitionStorage = new
TestMvPartitionStorage(indexes);
+
+ @Override
+ protected MvPartitionStorage partitionStorage() {
+ return partitionStorage;
+ }
+
+ @Override
+ protected SortedIndexMvStorage createIndexStorage(String name, TableView
tableCfg) {
+ TestSortedIndexMvStorage index = new TestSortedIndexMvStorage(name,
tableCfg, schemaDescriptor, Map.of(0, partitionStorage));
+
+ indexes.add(index);
+
+ return index;
+ }
+}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
index 18d0c4e71..ea5fffd53 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -208,32 +207,7 @@ public class TestConcurrentHashMapPartitionStorage
implements PartitionStorage {
.filter(filter)
.iterator();
- return new Cursor<>() {
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override
- public DataRow next() {
- return iter.next();
- }
-
- /** {@inheritDoc} */
- @NotNull
- @Override
- public Iterator<DataRow> iterator() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() {
- // No-op.
- }
- };
+ return Cursor.fromIterator(iter);
}
/** {@inheritDoc} */
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index fb2d6e0ac..9dca234de 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
import org.apache.ignite.internal.storage.index.IndexRowFactory;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.PrefixComparator;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
diff --git
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
index afd13de13..9afaa31ed 100644
---
a/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
+++
b/modules/vault/src/main/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java
@@ -108,27 +108,7 @@ public class InMemoryVaultService implements VaultService {
}
}
- return new Cursor<>() {
- @Override
- public void close() {
- }
-
- @NotNull
- @Override
- public Iterator<VaultEntry> iterator() {
- return this;
- }
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public VaultEntry next() {
- return it.next();
- }
- };
+ return Cursor.fromIterator(it);
}
/** {@inheritDoc} */