This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 28afe43525 IGNITE-19422 Fixed "get" method in index storages. (#2161)
28afe43525 is described below
commit 28afe435258f1c1a1f4055f18b136ba0a1573f2b
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Jun 8 12:54:08 2023 +0300
IGNITE-19422 Fixed "get" method in index storages. (#2161)
---
.../storage/index/AbstractIndexStorageTest.java | 3 -
.../index/AbstractPageMemoryIndexStorage.java | 127 +++++++++++++-
.../IndexRowKey.java} | 22 +--
.../pagememory/index/hash/HashIndexRowKey.java | 9 +-
.../index/hash/PageMemoryHashIndexStorage.java | 47 ++----
.../index/sorted/PageMemorySortedIndexStorage.java | 184 ++++-----------------
.../pagememory/index/sorted/SortedIndexRowKey.java | 9 +-
.../rocksdb/index/AbstractRocksDbIndexStorage.java | 143 ++++++++++++++++
.../rocksdb/index/RocksDbHashIndexStorage.java | 47 +-----
.../rocksdb/index/RocksDbSortedIndexStorage.java | 127 +-------------
10 files changed, 334 insertions(+), 384 deletions(-)
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
index 6f6090f8e4..0148c839d6 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
@@ -60,7 +60,6 @@ import
org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -204,7 +203,6 @@ public abstract class AbstractIndexStorageTest<S extends
IndexStorage, D extends
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-19422")
public void testGetConcurrentPut() {
S index = createIndexStorage(INDEX_NAME, ColumnType.INT32,
ColumnType.string());
var serializer = new BinaryTupleRowSerializer(indexDescriptor(index));
@@ -230,7 +228,6 @@ public abstract class AbstractIndexStorageTest<S extends
IndexStorage, D extends
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-19422")
public void testGetConcurrentReplace() {
S index = createIndexStorage(INDEX_NAME, ColumnType.INT32,
ColumnType.string());
var serializer = new BinaryTupleRowSerializer(indexDescriptor(index));
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
index cb7c502935..a3fcfcee45 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
@@ -22,13 +22,17 @@ import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.PeekCursor;
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
import
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumnsFreeList;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaKey;
@@ -43,7 +47,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Abstract index storage based on Page Memory.
*/
-public abstract class AbstractPageMemoryIndexStorage implements IndexStorage {
+public abstract class AbstractPageMemoryIndexStorage<K extends IndexRowKey, V
extends K> implements IndexStorage {
/** Index ID. */
private final int indexId;
@@ -192,6 +196,127 @@ public abstract class AbstractPageMemoryIndexStorage
implements IndexStorage {
state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
}
+ /** Constant that represents the absence of value in {@link ScanCursor}.
Not equivalent to {@code null} value. */
+ private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+ /**
+ * Cursor that always returns up-to-date next element.
+ *
+ * @param <R> Type of the returned value.
+ */
+ protected abstract class ScanCursor<R> implements PeekCursor<R> {
+ private final BplusTree<K, V> indexTree;
+
+ private final @Nullable K lower;
+
+ private @Nullable Boolean hasNext;
+
+ /**
+ * Last row used in mapping in the {@link #next()} call.
+ * {@code null} upon cursor creation or after {@link #hasNext()}
returned {@code null}.
+ */
+ private @Nullable V treeRow;
+
+ /**
+ * Row used in the mapping of the latest {@link #peek()} call, that
was performed after the last {@link #next()} call.
+ * {@link #NO_INDEX_ROW} if there was no such call.
+ */
+ private @Nullable V peekedRow = (V) NO_INDEX_ROW;
+
+ protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+ this.lower = lower;
+ this.indexTree = indexTree;
+ }
+
+ /**
+ * Maps value from the index tree into the required result.
+ */
+ protected abstract R map(V value);
+
+ /**
+ * Check whether the passed value exceeds the upper bound for the scan.
+ */
+ protected abstract boolean exceedsUpperBound(V value);
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ return busy(() -> {
+ try {
+ return advanceIfNeededBusy();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while advancing the
cursor", e);
+ }
+ });
+ }
+
+ @Override
+ public R next() {
+ return busy(() -> {
+ try {
+ if (!advanceIfNeededBusy()) {
+ throw new NoSuchElementException();
+ }
+
+ this.hasNext = null;
+
+ return map(treeRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while advancing the
cursor", e);
+ }
+ });
+ }
+
+ @Override
+ public @Nullable R peek() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+ try {
+ return map(peekBusy());
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error when peeking next
element", e);
+ }
+ });
+ }
+
+ private @Nullable V peekBusy() throws IgniteInternalCheckedException {
+ if (hasNext != null) {
+ return treeRow;
+ }
+
+ if (treeRow == null) {
+ peekedRow = lower == null ? indexTree.findFirst() :
indexTree.findNext(lower, true);
+ } else {
+ peekedRow = indexTree.findNext(treeRow, false);
+ }
+
+ if (peekedRow != null && exceedsUpperBound(peekedRow)) {
+ peekedRow = null;
+ }
+
+ return peekedRow;
+ }
+
+ private boolean advanceIfNeededBusy() throws
IgniteInternalCheckedException {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+ if (hasNext != null) {
+ return hasNext;
+ }
+
+ treeRow = (peekedRow == NO_INDEX_ROW) ? peekBusy() : peekedRow;
+ peekedRow = (V) NO_INDEX_ROW;
+
+ hasNext = treeRow != null;
+ return hasNext;
+ }
+ }
+
protected <V> V busy(Supplier<V> supplier) {
if (!busyLock.enterBusy()) {
throwExceptionDependingOnStorageState(state.get(),
createStorageInfo());
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
similarity index 65%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
index efecee91e5..37f2b41c5e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
@@ -15,29 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.index.sorted;
+package org.apache.ignite.internal.storage.pagememory.index.common;
import
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
/**
- * Key to search for a {@link SortedIndexRow} in the {@link SortedIndexTree}.
+ * Common interface for search keys in index trees.
*/
-public class SortedIndexRowKey {
- private final IndexColumns indexColumns;
-
- /**
- * Constructor.
- *
- * @param indexColumns Index columns.
- */
- public SortedIndexRowKey(IndexColumns indexColumns) {
- this.indexColumns = indexColumns;
- }
-
+@FunctionalInterface
+public interface IndexRowKey {
/**
* Returns an indexed columns value.
*/
- public IndexColumns indexColumns() {
- return indexColumns;
- }
+ IndexColumns indexColumns();
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
index 74dab15fb4..ed1e0068d7 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal.storage.pagememory.index.hash;
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
import
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
/**
* Key to search for a {@link HashIndexRow} in the {@link HashIndexTree}.
*/
-public class HashIndexRowKey {
+public class HashIndexRowKey implements IndexRowKey {
private final int indexColumnsHash;
private final IndexColumns indexColumns;
@@ -33,7 +34,7 @@ public class HashIndexRowKey {
* @param indexColumnsHash Hash of the index columns.
* @param indexColumns Index columns.
*/
- public HashIndexRowKey(int indexColumnsHash, IndexColumns indexColumns) {
+ HashIndexRowKey(int indexColumnsHash, IndexColumns indexColumns) {
this.indexColumnsHash = indexColumnsHash;
this.indexColumns = indexColumns;
@@ -46,9 +47,7 @@ public class HashIndexRowKey {
return indexColumnsHash;
}
- /**
- * Returns an indexed columns value.
- */
+ @Override
public IndexColumns indexColumns() {
return indexColumns;
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 26feeb34cb..1d70bdf1d4 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.storage.pagememory.index.hash;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
+import java.util.Objects;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
@@ -41,7 +42,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
* Implementation of Hash index storage using Page Memory.
*/
-public class PageMemoryHashIndexStorage extends AbstractPageMemoryIndexStorage
implements HashIndexStorage {
+public class PageMemoryHashIndexStorage extends
AbstractPageMemoryIndexStorage<HashIndexRowKey, HashIndexRow> implements
HashIndexStorage {
private static final IgniteLogger LOG =
Loggers.forClass(PageMemoryHashIndexStorage.class);
/** Index descriptor. */
@@ -82,41 +83,21 @@ public class PageMemoryHashIndexStorage extends
AbstractPageMemoryIndexStorage i
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- try {
- IndexColumns indexColumns = new IndexColumns(partitionId,
key.byteBuffer());
-
- HashIndexRow lowerBound = new HashIndexRow(indexColumns,
lowestRowId);
- HashIndexRow upperBound = new HashIndexRow(indexColumns,
highestRowId);
-
- Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound,
upperBound);
-
- return new Cursor<RowId>() {
- @Override
- public void close() {
- cursor.close();
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemoryHashIndexStorage.this::createStorageInfo);
+ IndexColumns indexColumns = new IndexColumns(partitionId,
key.byteBuffer());
- return cursor.hasNext();
- });
- }
+ HashIndexRow lowerBound = new HashIndexRow(indexColumns,
lowestRowId);
- @Override
- public RowId next() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemoryHashIndexStorage.this::createStorageInfo);
+ return new ScanCursor<RowId>(lowerBound, hashIndexTree) {
+ @Override
+ protected RowId map(HashIndexRow value) {
+ return value.rowId();
+ }
- return cursor.next().rowId();
- });
- }
- };
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Failed to create scan cursor", e);
- }
+ @Override
+ protected boolean exceedsUpperBound(HashIndexRow value) {
+ return !Objects.equals(value.indexColumns().valueBuffer(),
key.byteBuffer());
+ }
+ };
});
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 31c4ae8135..fb0079fa9e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -21,8 +21,7 @@ import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import java.nio.ByteBuffer;
-import java.util.NoSuchElementException;
-import java.util.function.Function;
+import java.util.Objects;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -31,7 +30,6 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -50,7 +48,8 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of Sorted index storage using Page Memory.
*/
-public class PageMemorySortedIndexStorage extends
AbstractPageMemoryIndexStorage implements SortedIndexStorage {
+public class PageMemorySortedIndexStorage extends
AbstractPageMemoryIndexStorage<SortedIndexRowKey, SortedIndexRow>
+ implements SortedIndexStorage {
private static final IgniteLogger LOG =
Loggers.forClass(PageMemorySortedIndexStorage.class);
/** Index descriptor. */
@@ -91,15 +90,19 @@ public class PageMemorySortedIndexStorage extends
AbstractPageMemoryIndexStorage
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- try {
- SortedIndexRowKey lowerBound = toSortedIndexRow(key,
lowestRowId);
+ SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
- SortedIndexRowKey upperBound = toSortedIndexRow(key,
highestRowId);
+ return new ScanCursor<RowId>(lowerBound, sortedIndexTree) {
+ @Override
+ protected RowId map(SortedIndexRow value) {
+ return value.rowId();
+ }
- return convertCursor(sortedIndexTree.find(lowerBound,
upperBound), SortedIndexRow::rowId);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Failed to create scan cursor", e);
- }
+ @Override
+ protected boolean exceedsUpperBound(SortedIndexRow value) {
+ return !Objects.equals(value.indexColumns().valueBuffer(),
key.byteBuffer());
+ }
+ };
});
}
@@ -154,7 +157,20 @@ public class PageMemorySortedIndexStorage extends
AbstractPageMemoryIndexStorage
SortedIndexRowKey upper = createBound(upperBound, includeUpper);
- return new ScanCursor(lower, upper);
+ return new ScanCursor<IndexRow>(lower, sortedIndexTree) {
+ @Override
+ public IndexRow map(SortedIndexRow value) {
+ return toIndexRowImpl(value);
+ }
+
+ @Override
+ protected boolean exceedsUpperBound(SortedIndexRow value) {
+ return upper != null && 0 <=
sortedIndexTree.getBinaryTupleComparator().compare(
+ value.indexColumns().valueBuffer(),
+ upper.indexColumns().valueBuffer()
+ );
+ }
+ };
});
}
@@ -190,150 +206,6 @@ public class PageMemorySortedIndexStorage extends
AbstractPageMemoryIndexStorage
sortedIndexTree.close();
}
- /**
- * Returns a new cursor that converts elements to another type, and also
throws {@link StorageClosedException} on
- * {@link Cursor#hasNext()} and {@link Cursor#next()} when the sorted
index storage is {@link #close()}.
- *
- * @param cursor Cursor.
- * @param mapper Conversion function.
- */
- private <T, R> Cursor<R> convertCursor(Cursor<T> cursor, Function<T, R>
mapper) {
- return new Cursor<>() {
- @Override
- public void close() {
- cursor.close();
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemorySortedIndexStorage.this::createStorageInfo);
-
- return cursor.hasNext();
- });
- }
-
- @Override
- public R next() {
- return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemorySortedIndexStorage.this::createStorageInfo);
-
- return mapper.apply(cursor.next());
- });
- }
- };
- }
-
- /** Constant that represents the absence of value in {@link ScanCursor}. */
- private static final SortedIndexRow NO_INDEX_ROW = new
SortedIndexRow(null, null);
-
- private class ScanCursor implements PeekCursor<IndexRow> {
- @Nullable
- private Boolean hasNext;
-
- @Nullable
- private final SortedIndexRowKey lower;
-
- @Nullable
- private final SortedIndexRowKey upper;
-
- @Nullable
- private SortedIndexRow treeRow;
-
- @Nullable
- private SortedIndexRow peekedRow = NO_INDEX_ROW;
-
- private ScanCursor(@Nullable SortedIndexRowKey lower, @Nullable
SortedIndexRowKey upper) {
- this.lower = lower;
- this.upper = upper;
- }
-
- @Override
- public void close() {
- // No-op.
- }
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
- try {
- return advanceIfNeeded();
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error while advancing the
cursor", e);
- }
- });
- }
-
- @Override
- public IndexRow next() {
- return busy(() -> {
- try {
- if (!advanceIfNeeded()) {
- throw new NoSuchElementException();
- }
-
- this.hasNext = null;
-
- return toIndexRowImpl(treeRow);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error while advancing the
cursor", e);
- }
- });
- }
-
- @Override
- public @Nullable IndexRow peek() {
- return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemorySortedIndexStorage.this::createStorageInfo);
-
- try {
- return toIndexRowImpl(peekBusy());
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error when peeking next
element", e);
- }
- });
- }
-
- private @Nullable SortedIndexRow peekBusy() throws
IgniteInternalCheckedException {
- if (hasNext != null) {
- return treeRow;
- }
-
- if (treeRow == null) {
- peekedRow = lower == null ? sortedIndexTree.findFirst() :
sortedIndexTree.findNext(lower, true);
- } else {
- peekedRow = sortedIndexTree.findNext(treeRow, false);
- }
-
- if (peekedRow == null || (upper != null && compareRows(peekedRow,
upper) >= 0)) {
- peekedRow = null;
- }
-
- return peekedRow;
- }
-
- private boolean advanceIfNeeded() throws
IgniteInternalCheckedException {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
PageMemorySortedIndexStorage.this::createStorageInfo);
-
- if (hasNext != null) {
- return hasNext;
- }
-
- treeRow = (peekedRow == NO_INDEX_ROW) ? peekBusy() : peekedRow;
- peekedRow = NO_INDEX_ROW;
-
- hasNext = treeRow != null;
- return hasNext;
- }
-
- private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey
key2) {
- return sortedIndexTree.getBinaryTupleComparator().compare(
- key1.indexColumns().valueBuffer(),
- key2.indexColumns().valueBuffer()
- );
- }
- }
-
/**
* Updates the internal data structures of the storage on rebalance or
cleanup.
*
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
index efecee91e5..e2be01651e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal.storage.pagememory.index.sorted;
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
import
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
/**
* Key to search for a {@link SortedIndexRow} in the {@link SortedIndexTree}.
*/
-public class SortedIndexRowKey {
+public class SortedIndexRowKey implements IndexRowKey {
private final IndexColumns indexColumns;
/**
@@ -30,13 +31,11 @@ public class SortedIndexRowKey {
*
* @param indexColumns Index columns.
*/
- public SortedIndexRowKey(IndexColumns indexColumns) {
+ SortedIndexRowKey(IndexColumns indexColumns) {
this.indexColumns = indexColumns;
}
- /**
- * Returns an indexed columns value.
- */
+ @Override
public IndexColumns indexColumns() {
return indexColumns;
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index f51148e7ce..157432fd58 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -17,22 +17,35 @@
package org.apache.ignite.internal.storage.rocksdb.index;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
@@ -201,4 +214,134 @@ abstract class AbstractRocksDbIndexStorage implements
IndexStorage {
* @throws RocksDBException If failed to delete data.
*/
abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+ /**
+ * Cursor that always returns up-to-date next element.
+ */
+ protected abstract class UpToDatePeekCursor<T> implements PeekCursor<T> {
+ private final Slice upperBoundSlice;
+ private final byte[] lowerBound;
+
+ private final ReadOptions options;
+ private final RocksIterator it;
+
+ private @Nullable Boolean hasNext;
+
+ /**
+ * Last key used in mapping in the {@link #next()} call.
+ * {@code null} upon cursor creation or after {@link #hasNext()}
returned {@code null}.
+ */
+ private byte @Nullable [] key;
+
+ /**
+ * Row used in the mapping of the latest {@link #peek()} call, that
was performed after the last {@link #next()} call.
+ * {@link ArrayUtils#BYTE_EMPTY_ARRAY} if there was no such call.
+ */
+ private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+ UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, byte[]
lowerBound) {
+ this.lowerBound = lowerBound;
+ upperBoundSlice = new Slice(upperBound);
+ options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+ it = indexCf.newIterator(options);
+ }
+
+ /**
+ * Maps the key from the index into the required result.
+ */
+ protected abstract T map(ByteBuffer byteBuffer);
+
+ @Override
+ public void close() {
+ try {
+ closeAll(it, options, upperBoundSlice);
+ } catch (Exception e) {
+ throw new StorageException("Error closing cursor", e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return busy(this::advanceIfNeededBusy);
+ }
+
+ @Override
+ public T next() {
+ return busy(() -> {
+ if (!advanceIfNeededBusy()) {
+ throw new NoSuchElementException();
+ }
+
+ this.hasNext = null;
+
+ return map(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+ });
+ }
+
+ @Override
+ public @Nullable T peek() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+ byte[] res = peekBusy();
+
+ if (res == null) {
+ return null;
+ } else {
+ return map(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+ }
+ });
+ }
+
+ private byte @Nullable [] peekBusy() {
+ if (hasNext != null) {
+ return key;
+ }
+
+ refreshAndPrepareRocksIteratorBusy();
+
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
+
+ peekedKey = null;
+ } else {
+ peekedKey = it.key();
+ }
+
+ return peekedKey;
+ }
+
+ private boolean advanceIfNeededBusy() throws StorageException {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+ //noinspection ArrayEquality
+ key = (peekedKey == BYTE_EMPTY_ARRAY) ? peekBusy() : peekedKey;
+ peekedKey = BYTE_EMPTY_ARRAY;
+
+ hasNext = key != null;
+ return hasNext;
+ }
+
+ private void refreshAndPrepareRocksIteratorBusy() {
+ try {
+ it.refresh();
+ } catch (RocksDBException e) {
+ throw new StorageException("Error refreshing an iterator", e);
+ }
+
+ if (key == null) {
+ it.seek(lowerBound);
+ } else {
+ it.seekForPrev(key);
+
+ if (it.isValid()) {
+ it.next();
+ } else {
+ RocksUtils.checkIterator(it);
+
+ it.seek(lowerBound);
+ }
+ }
+ }
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 8324f1779a..701c51ecb0 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -24,12 +24,9 @@ import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -40,10 +37,7 @@ import
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.HashUtils;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
import org.rocksdb.WriteOptions;
@@ -113,48 +107,15 @@ public class RocksDbHashIndexStorage extends
AbstractRocksDbIndexStorage impleme
byte[] rangeEnd = incrementPrefix(rangeStart);
- Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
-
- ReadOptions options = new
ReadOptions().setIterateUpperBound(upperBound);
-
- RocksIterator it = indexCf.newIterator(options);
-
- it.seek(rangeStart);
-
- return new RocksIteratorAdapter<RowId>(it) {
+ return new UpToDatePeekCursor<RowId>(rangeEnd, indexCf,
rangeStart) {
@Override
- protected RowId decodeEntry(byte[] key, byte[] value) {
+ protected RowId map(ByteBuffer byteBuffer) {
// RowId UUID is located at the last 16 bytes of the key
- long mostSignificantBits = bytesToLong(key, key.length -
Long.BYTES * 2);
- long leastSignificantBits = bytesToLong(key, key.length -
Long.BYTES);
+ long mostSignificantBits =
byteBuffer.getLong(rangeStart.length);
+ long leastSignificantBits =
byteBuffer.getLong(rangeStart.length + Long.BYTES);
return new RowId(helper.partitionId(),
mostSignificantBits, leastSignificantBits);
}
-
- @Override
- public boolean hasNext() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbHashIndexStorage.this::createStorageInfo);
-
- return super.hasNext();
- });
- }
-
- @Override
- public RowId next() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbHashIndexStorage.this::createStorageInfo);
-
- return super.next();
- });
- }
-
- @Override
- public void close() {
- super.close();
-
- RocksUtils.closeAll(options, upperBound);
- }
};
});
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 418036e6c0..6544fc270f 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -23,15 +23,12 @@ import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.NoSuchElementException;
import java.util.function.Function;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
@@ -45,10 +42,7 @@ import
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
@@ -149,7 +143,7 @@ public class RocksDbSortedIndexStorage extends
AbstractRocksDbIndexStorage imple
});
}
- private <T> PeekCursor<T> scan(
+ protected <T> PeekCursor<T> scan(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
boolean includeLower,
@@ -159,7 +153,7 @@ public class RocksDbSortedIndexStorage extends
AbstractRocksDbIndexStorage imple
byte[] lowerBoundBytes;
if (lowerBound == null) {
- lowerBoundBytes = null;
+ lowerBoundBytes = helper.partitionStartPrefix();
} else {
lowerBoundBytes = rocksPrefix(lowerBound);
@@ -172,7 +166,7 @@ public class RocksDbSortedIndexStorage extends
AbstractRocksDbIndexStorage imple
byte[] upperBoundBytes;
if (upperBound == null) {
- upperBoundBytes = null;
+ upperBoundBytes = helper.partitionEndPrefix();
} else {
upperBoundBytes = rocksPrefix(upperBound);
@@ -182,119 +176,10 @@ public class RocksDbSortedIndexStorage extends
AbstractRocksDbIndexStorage imple
}
}
- return createScanCursor(lowerBoundBytes, upperBoundBytes, mapper);
- }
-
- private <T> PeekCursor<T> createScanCursor(
- byte @Nullable [] lowerBound,
- byte @Nullable [] upperBound,
- Function<ByteBuffer, T> mapper
- ) {
- Slice upperBoundSlice = upperBound == null ? new
Slice(helper.partitionEndPrefix()) : new Slice(upperBound);
-
- ReadOptions options = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
-
- RocksIterator it = indexCf.newIterator(options);
-
- return new PeekCursor<>() {
- @Nullable
- private Boolean hasNext;
-
- private byte @Nullable [] key;
-
- private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
-
- @Override
- public void close() {
- try {
- closeAll(it, options, upperBoundSlice);
- } catch (Exception e) {
- throw new StorageException("Error closing cursor", e);
- }
- }
-
+ return new UpToDatePeekCursor<>(upperBoundBytes, indexCf,
lowerBoundBytes) {
@Override
- public boolean hasNext() {
- return busy(this::advanceIfNeeded);
- }
-
- @Override
- public T next() {
- return busy(() -> {
- if (!advanceIfNeeded()) {
- throw new NoSuchElementException();
- }
-
- this.hasNext = null;
-
- return
mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
- });
- }
-
- @Override
- public @Nullable T peek() {
- return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbSortedIndexStorage.this::createStorageInfo);
-
- byte[] res = peek0();
-
- if (res == null) {
- return null;
- } else {
- return
mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
- }
- });
- }
-
- private byte @Nullable [] peek0() {
- if (hasNext != null) {
- return key;
- }
-
- refreshAndPrepareRocksIterator();
-
- if (!it.isValid()) {
- RocksUtils.checkIterator(it);
-
- peekedKey = null;
- } else {
- peekedKey = it.key();
- }
-
- return peekedKey;
- }
-
- private boolean advanceIfNeeded() throws StorageException {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbSortedIndexStorage.this::createStorageInfo);
-
- //noinspection ArrayEquality
- key = (peekedKey == BYTE_EMPTY_ARRAY) ? peek0() : peekedKey;
- peekedKey = BYTE_EMPTY_ARRAY;
-
- hasNext = key != null;
- return hasNext;
- }
-
- private void refreshAndPrepareRocksIterator() {
- try {
- it.refresh();
- } catch (RocksDBException e) {
- throw new StorageException("Error refreshing an iterator",
e);
- }
-
- if (key == null) {
- it.seek(lowerBound == null ? helper.partitionStartPrefix()
: lowerBound);
- } else {
- it.seekForPrev(key);
-
- if (it.isValid()) {
- it.next();
- } else {
- RocksUtils.checkIterator(it);
-
- it.seek(lowerBound == null ?
helper.partitionStartPrefix() : lowerBound);
- }
- }
+ protected T map(ByteBuffer byteBuffer) {
+ return mapper.apply(byteBuffer);
}
};
}