This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c3f3b6a4ce IGNITE-23417 Fix race between compaction and
KeyValueStorage#range (#4550)
c3f3b6a4ce is described below
commit c3f3b6a4ce6b6a7e11da44fc98ccd7a8e6e57876
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 14 16:38:19 2024 +0300
IGNITE-23417 Fix race between compaction and KeyValueStorage#range (#4550)
---
.../metastorage/server/KeyValueStorage.java | 6 +-
.../server/persistence/RocksDbKeyValueStorage.java | 21 +++++--
.../AbstractCompactionKeyValueStorageTest.java | 69 ++++++++++++++++++++++
.../server/SimpleInMemoryKeyValueStorage.java | 23 +++++---
4 files changed, 106 insertions(+), 13 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ccbaa1d580..92432367e0 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -287,7 +287,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Returns cursor by latest entries which correspond to the given keys
range.
*
- * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked. Also, each entry will be the
+ * only one with the most recent revision.</p>
*
* <p>Never throws {@link CompactedException} as well as cursor
methods.</p>
*
@@ -299,7 +300,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Returns cursor by entries which correspond to the given keys range and
bounded by revision number.
*
- * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked. And also each record will be
+ * one and with a revision less than or equal to the {@code
revUpperBound}.</p>
*
* <p>Cursor methods never throw {@link CompactedException}.</p>
*
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 5e792e495f..bf743489de 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -111,6 +111,7 @@ import
org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -1621,12 +1622,20 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
private Value getValueForOperation(byte[] key, long revision) {
+ Value value = getValueForOperationNullable(key, revision);
+
+ assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
+
+ return value;
+ }
+
+ private @Nullable Value getValueForOperationNullable(byte[] key, long
revision) {
try {
byte[] valueBytes = data.get(keyToRocksKey(revision, key));
assert valueBytes != null && valueBytes.length != 0 : "key=" +
toUtf8String(key) + ", revision=" + revision;
- return bytesToValue(valueBytes);
+ return ArrayUtils.nullOrEmpty(valueBytes) ? null :
bytesToValue(valueBytes);
} catch (RocksDBException e) {
throw new MetaStorageException(
OP_EXECUTION_ERR,
@@ -1651,6 +1660,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
iterator.seek(keyFrom);
+ long compactionRevisionBeforeCreateCursor = compactionRevision;
+
return new RocksIteratorAdapter<>(iterator) {
/** Cached entry used to filter "empty" values. */
private @Nullable Entry next;
@@ -1702,10 +1713,12 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
long revision = keyRevisions[maxRevisionIndex];
+ Value value = getValueForOperationNullable(key, revision);
- // According to the compaction algorithm, we will start it
locally on a new compaction revision only when all cursors are
- // completed strictly before it. Therefore, during normal
operation, we should not get an error here.
- Value value = getValueForOperation(key, revision);
+ // Value may be null if the compaction has removed it in
parallel.
+ if (value == null || (revision <=
compactionRevisionBeforeCreateCursor && value.tombstone())) {
+ return EntryImpl.empty(key);
+ }
return EntryImpl.toEntry(key, revision, value);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 66324d84ce..956382ca5b 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -722,6 +723,70 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
}
}
+ /**
+ * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link
KeyValueStorage#range(byte[], byte[], long)} for the case when
+ * cursors should or should not return the last element after compaction.
For {@link #FOO_KEY} and {@link #BAR_KEY}, the last revisions
+ * are 5, a regular entry and a tombstone. Keys with their revisions are
added in {@link #setUp()}.
+ *
+ * <p>Consider the situation:</p>
+ * <ul>
+ * <li>Made {@link KeyValueStorage#setCompactionRevision(long)
KeyValueStorage.setCompactionRevision(5)}.</li>
+ * <li>Waited for all cursors to end with revision {@code 5} or
less.</li>
+ * <li>Made {@link KeyValueStorage#compact(long)
KeyValueStorage.compact(5)}.</li>
+ * <li>Invoke {@link KeyValueStorage#range} for last revision and
revision {@code 6} for {@link #FOO_KEY} and {@link #BAR_KEY}.
+ * <ul>
+ * <li>For {@link #FOO_KEY}, we need to return a entry with
revision {@code 5}, since it will not be removed from the storage
+ * after compaction.</li>
+ * <li>For {@link #BAR_KEY}, we should not return anything, since
the key will be deleted after compaction.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+ @Test
+ void testRangeAndCompactionForCaseReadLastEntries() {
+ storage.setCompactionRevision(5);
+
+ try (
+ Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY,
storage.nextKey(FOO_KEY));
+ Cursor<Entry> rangeFooKeyCursorBounded =
storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 6);
+ Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY,
storage.nextKey(BAR_KEY));
+ Cursor<Entry> rangeBarKeyCursorBounded =
storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 6)
+ ) {
+ // Must see the latest revision of the FOO_KEY as it will not be
removed from storage by the compaction.
+ assertEquals(List.of(5L),
collectRevisions(rangeFooKeyCursorLatest));
+ assertEquals(List.of(5L),
collectRevisions(rangeFooKeyCursorBounded));
+
+ // Must not see the latest revision of the BAR_KEY, as it will
have to be removed from the storage by the compaction.
+ assertEquals(List.of(), collectRevisions(rangeBarKeyCursorLatest));
+ assertEquals(List.of(),
collectRevisions(rangeBarKeyCursorBounded));
+ }
+ }
+
+ /**
+ * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link
KeyValueStorage#range(byte[], byte[], long)} for the case when they
+ * were invoked on a revision (for example, on revision {@code 5}) before
invoking
+ * {@link KeyValueStorage#setCompactionRevision(long)
KeyValueStorage.setCompactionRevision(5)} but before invoking
+ * {@link KeyValueStorage#compact(long) KeyValueStorage.compact(5)}. Such
cursors should return entries since nothing should be
+ * removed yet until they are completed. Keys are chosen for convenience.
Keys with their revisions are added in {@link #setUp()}.
+ */
+ @Test
+ void testRangeAfterSetCompactionRevisionButBeforeStartCompaction() {
+ try (
+ Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY,
storage.nextKey(FOO_KEY));
+ Cursor<Entry> rangeFooKeyCursorBounded =
storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 5);
+ Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY,
storage.nextKey(BAR_KEY));
+ Cursor<Entry> rangeBarKeyCursorBounded =
storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 5)
+ ) {
+ storage.setCompactionRevision(5);
+
+ assertEquals(List.of(5L),
collectRevisions(rangeFooKeyCursorLatest));
+ assertEquals(List.of(5L),
collectRevisions(rangeFooKeyCursorBounded));
+
+ assertEquals(List.of(5L),
collectRevisions(rangeBarKeyCursorLatest));
+ assertEquals(List.of(5L),
collectRevisions(rangeBarKeyCursorBounded));
+ }
+ }
+
private List<Integer> collectRevisions(byte[] key) {
var revisions = new ArrayList<Integer>();
@@ -736,6 +801,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
return revisions;
}
+ private List<Long> collectRevisions(Cursor<Entry> cursor) {
+ return cursor.stream().map(Entry::revision).collect(toList());
+ }
+
private static byte[] fromString(String s) {
return s.getBytes(UTF_8);
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 7d379dedb6..fad0672cb5 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -918,17 +918,21 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
private Value getValue(byte[] key, long revision) {
- NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
-
- assert valueByKey != null : "key=" + toUtf8String(key) + ", revision="
+ revision;
-
- Value value = valueByKey.get(key);
+ Value value = getValueNullable(key, revision);
assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
return value;
}
+ private @Nullable Value getValueNullable(byte[] key, long revision) {
+ NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
+
+ assert valueByKey != null : "key=" + toUtf8String(key) + ", revision="
+ revision;
+
+ return valueByKey.get(key);
+ }
+
private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo,
long revUpperBound) {
assert revUpperBound >= 0 : revUpperBound;
@@ -943,14 +947,19 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
byte[] key = e.getKey();
long[] keyRevisions = toLongArray(e.getValue());
- int maxRevisionIndex =
KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
+ int maxRevisionIndex = maxRevisionIndex(keyRevisions,
revUpperBound);
if (maxRevisionIndex == NOT_FOUND) {
return EntryImpl.empty(key);
}
long revision = keyRevisions[maxRevisionIndex];
- Value value = getValue(key, revision);
+ Value value = getValueNullable(key, revision);
+
+ // Value may be null if the compaction has removed it in
parallel.
+ if (value == null || (revision <= compactionRevision &&
value.tombstone())) {
+ return EntryImpl.empty(key);
+ }
return EntryImpl.toEntry(key, revision, value);
})