This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 39224e3dfa IGNITE-18020 Add GC support to rocksdb storage (#1619)
39224e3dfa is described below
commit 39224e3dfa464f11d7c7cacb740be5cf299be4ac
Author: Semyon Danilov <[email protected]>
AuthorDate: Mon Feb 6 19:02:41 2023 +0400
IGNITE-18020 Add GC support to rocksdb storage (#1619)
---
gradle/libs.versions.toml | 2 +-
modules/network/README.md | 6 +-
.../network/{docs => tech-notes}/network-flow.png | Bin
.../network/{docs => tech-notes}/network-flow.puml | 0
.../network/{docs => tech-notes}/threading-2.png | Bin
.../network/{docs => tech-notes}/threading-2.puml | 0
modules/network/{docs => tech-notes}/threading.png | Bin
.../network/{docs => tech-notes}/threading.puml | 0
.../internal/storage/MvPartitionStorage.java | 1 +
.../ignite/internal/storage/TableRowAndRowId.java | 2 +-
.../AbstractMvPartitionStorageConcurrencyTest.java | 36 +--
.../storage/AbstractMvPartitionStorageGcTest.java | 14 +-
.../storage/AbstractMvPartitionStorageTest.java | 9 +
.../internal/storage/BaseMvStoragesTest.java | 49 +--
.../storage/rocksdb/ColumnFamilyUtils.java | 27 +-
.../internal/storage/rocksdb/GarbageCollector.java | 320 ++++++++++++++++++++
.../storage/rocksdb/PartitionDataHelper.java | 212 +++++++++++++
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 330 ++++++++++-----------
.../storage/rocksdb/RocksDbTableStorage.java | 31 +-
.../rocksdb/RocksDbMvPartitionStorageGcTest.java | 44 +++
.../tech-notes/garbage-collection.md | 95 ++++++
21 files changed, 913 insertions(+), 265 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index de9d67aa59..ddc93a2355 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -57,7 +57,7 @@ jsonpath = "2.4.0"
classgraph = "4.8.110"
javassist = "3.28.0-GA"
checker = "3.10.0"
-rocksdb = "7.3.1"
+rocksdb = "7.9.2"
disruptor = "3.3.7"
metrics = "4.0.2"
jctools = "3.3.0"
diff --git a/modules/network/README.md b/modules/network/README.md
index 4aac91b3b0..12645de7b4 100644
--- a/modules/network/README.md
+++ b/modules/network/README.md
@@ -29,9 +29,9 @@ Messages are then passed on to message listeners of the
ConnectionManager.
In case of ClusterService over ScaleCube (see
`ScaleCubeClusterServiceFactory`),
messages are passed down to the ClusterService via the Project Reactor's Sink
which enforces a strict order of message handling:
a new message can't be received by ClusterService until a previous message is
**handled** (see [message handling](#message-handling)).
-
+
Message handling can also be offloaded to another thread:
-
+
Note that in this case the network message would be considered **handled**
before it is processed
by another thread.
@@ -46,4 +46,4 @@ Message is considered **handled** after all the message
handlers have been invok
## Message's flow example
Two nodes, Alice and Bob.
User is sending a message from Alice to Bob within any thread.
-
+
diff --git a/modules/network/docs/network-flow.png
b/modules/network/tech-notes/network-flow.png
similarity index 100%
rename from modules/network/docs/network-flow.png
rename to modules/network/tech-notes/network-flow.png
diff --git a/modules/network/docs/network-flow.puml
b/modules/network/tech-notes/network-flow.puml
similarity index 100%
rename from modules/network/docs/network-flow.puml
rename to modules/network/tech-notes/network-flow.puml
diff --git a/modules/network/docs/threading-2.png
b/modules/network/tech-notes/threading-2.png
similarity index 100%
rename from modules/network/docs/threading-2.png
rename to modules/network/tech-notes/threading-2.png
diff --git a/modules/network/docs/threading-2.puml
b/modules/network/tech-notes/threading-2.puml
similarity index 100%
rename from modules/network/docs/threading-2.puml
rename to modules/network/tech-notes/threading-2.puml
diff --git a/modules/network/docs/threading.png
b/modules/network/tech-notes/threading.png
similarity index 100%
rename from modules/network/docs/threading.png
rename to modules/network/tech-notes/threading.png
diff --git a/modules/network/docs/threading.puml
b/modules/network/tech-notes/threading.puml
similarity index 100%
rename from modules/network/docs/threading.puml
rename to modules/network/tech-notes/threading.puml
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index edfbd2283b..40dc2ea961 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -225,6 +225,7 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
* @param lowWatermark A time threshold for the row. Rows younger then the
watermark value will not be removed.
* @return A pair of table row and row id, where a timestamp of the row is
less than or equal to {@code lowWatermark}.
* {@code null} if there's no such value.
+ * @throws StorageException If failed to poll element for vacuum.
*/
default @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
throw new UnsupportedOperationException("pollForVacuum");
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
index 2dc1d5dae6..52d5b60b3b 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable;
* Wrapper that holds both {@link TableRow} and {@link RowId}. {@link
TableRow} is null for tombstones.
*/
public class TableRowAndRowId {
- /** Table row. */
+ /** Table row. {@code null} if tombstone. */
private final @Nullable TableRow tableRow;
/** Row id. */
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 7e1e8d9d7c..f193c34448 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.storage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.TableRow;
-import org.apache.ignite.internal.storage.impl.TestStorageEngine;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
@@ -85,9 +83,6 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
void testRegularGcAndRead(AddAndCommit addAndCommit) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-18020
- assumeTrue(engine instanceof TestStorageEngine);
-
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
@@ -108,9 +103,6 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-18020
- assumeTrue(engine instanceof TestStorageEngine);
-
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit.perform(this,
TABLE_ROW);
@@ -129,9 +121,6 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-18020
- assumeTrue(engine instanceof TestStorageEngine);
-
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -153,9 +142,6 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-18020
- assumeTrue(engine instanceof TestStorageEngine);
-
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -179,9 +165,6 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
- //TODO https://issues.apache.org/jira/browse/IGNITE-18020
- assumeTrue(engine instanceof TestStorageEngine);
-
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -198,6 +181,25 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
}
}
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testConcurrentGc(AddAndCommit addAndCommit) {
+ for (int i = 0; i < REPEATS; i++) {
+ addAndCommit.perform(this, TABLE_ROW);
+
+ addAndCommit.perform(this, null);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE)
+ );
+
+ assertNull(storage.closestRowId(ROW_ID));
+ }
+ }
+
@SuppressWarnings("ResultOfMethodCallIgnored")
private void scanFirstEntry(HybridTimestamp firstCommitTs) {
try (var cursor = scan(firstCommitTs)) {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index d83fdac549..0f4a9990b4 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -29,14 +29,14 @@ import org.junit.jupiter.api.Test;
public abstract class AbstractMvPartitionStorageGcTest extends
BaseMvPartitionStorageTest {
@Test
void testEmptyStorage() {
- assertNull(storage.pollForVacuum(clock.now()));
+ assertNull(pollForVacuum(clock.now()));
}
@Test
void testSingleValueStorage() {
addAndCommit(TABLE_ROW);
- assertNull(storage.pollForVacuum(clock.now()));
+ assertNull(pollForVacuum(clock.now()));
}
@Test
@@ -48,13 +48,13 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
HybridTimestamp secondCommitTs = addAndCommit(TABLE_ROW2);
// Data is still visible for older timestamps.
- assertNull(storage.pollForVacuum(firstCommitTs));
+ assertNull(pollForVacuum(firstCommitTs));
- assertNull(storage.pollForVacuum(tsBetweenCommits));
+ assertNull(pollForVacuum(tsBetweenCommits));
// Once a low watermark value becomes equal to second commit
timestamp, previous value
// becomes completely inaccessible and should be purged.
- TableRowAndRowId gcedRow = storage.pollForVacuum(secondCommitTs);
+ TableRowAndRowId gcedRow = pollForVacuum(secondCommitTs);
assertNotNull(gcedRow);
@@ -72,7 +72,7 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
addAndCommit(TABLE_ROW);
HybridTimestamp secondCommitTs = addAndCommit(null);
- TableRowAndRowId row = storage.pollForVacuum(secondCommitTs);
+ TableRowAndRowId row = pollForVacuum(secondCommitTs);
assertNotNull(row);
assertRowMatches(row.tableRow(), TABLE_ROW);
@@ -89,7 +89,7 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
addAndCommit(null);
HybridTimestamp lastCommitTs = addAndCommit(null);
- TableRowAndRowId row = storage.pollForVacuum(lastCommitTs);
+ TableRowAndRowId row = pollForVacuum(lastCommitTs);
assertNotNull(row);
assertRowMatches(row.tableRow(), TABLE_ROW);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 8d2c9e7d65..9c8164b22b 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -553,6 +553,15 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
assertThat(returnedRow, is(nullValue()));
}
+ @Test
+ void addWriteCommittedTombstone() {
+ addWriteCommitted(ROW_ID, tableRow, clock.now());
+ assertRowMatches(read(ROW_ID, HybridTimestamp.MAX_VALUE), tableRow);
+
+ addWriteCommitted(ROW_ID, null, clock.now());
+ assertNull(read(ROW_ID, HybridTimestamp.MAX_VALUE));
+ }
+
@Test
void addWriteCreatesUncommittedVersion() {
RowId rowId = insert(tableRow, txId);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index b7e83ce61f..d67b793f56 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
-import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -52,58 +51,36 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
/**
* Base test for MV storages, contains pojo classes, their descriptor and a
marshaller instance.
*/
public abstract class BaseMvStoragesTest {
/** Default reflection marshaller factory. */
- protected static MarshallerFactory marshallerFactory;
+ protected static final MarshallerFactory marshallerFactory = new
ReflectionMarshallerFactory();
/** Schema descriptor for tests. */
- protected static SchemaDescriptor schemaDescriptor;
+ protected static final SchemaDescriptor schemaDescriptor = new
SchemaDescriptor(1, new Column[]{
+ new Column("INTKEY", NativeTypes.INT32, false),
+ new Column("STRKEY", NativeTypes.STRING, false),
+ }, new Column[]{
+ new Column("INTVAL", NativeTypes.INT32, false),
+ new Column("STRVAL", NativeTypes.STRING, false),
+ });
/** Key-value marshaller for tests. */
- protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+ protected static final KvMarshaller<TestKey, TestValue> kvMarshaller
+ = marshallerFactory.create(schemaDescriptor, TestKey.class,
TestValue.class);
/** Key-value {@link BinaryTuple} converter for tests. */
- protected static BinaryConverter kvBinaryConverter;
+ protected static final BinaryConverter kvBinaryConverter =
BinaryConverter.forRow(schemaDescriptor);
/** Key {@link BinaryTuple} converter for tests. */
- protected static BinaryConverter kBinaryConverter;
+ protected static final BinaryConverter kBinaryConverter =
BinaryConverter.forKey(schemaDescriptor);
/** Hybrid clock to generate timestamps. */
protected final HybridClock clock = new HybridClockImpl();
- @BeforeAll
- static void beforeAll() {
- marshallerFactory = new ReflectionMarshallerFactory();
-
- schemaDescriptor = new SchemaDescriptor(1, new Column[]{
- new Column("intKey".toUpperCase(Locale.ROOT),
NativeTypes.INT32, false),
- new Column("strKey".toUpperCase(Locale.ROOT),
NativeTypes.STRING, false),
- }, new Column[]{
- new Column("intVal".toUpperCase(Locale.ROOT),
NativeTypes.INT32, false),
- new Column("strVal".toUpperCase(Locale.ROOT),
NativeTypes.STRING, false),
- });
-
- kvMarshaller = marshallerFactory.create(schemaDescriptor,
TestKey.class, TestValue.class);
-
- kvBinaryConverter = BinaryConverter.forRow(schemaDescriptor);
- kBinaryConverter = BinaryConverter.forKey(schemaDescriptor);
- }
-
- @AfterAll
- static void afterAll() {
- kvMarshaller = null;
- schemaDescriptor = null;
- marshallerFactory = null;
- kvBinaryConverter = null;
- kBinaryConverter = null;
- }
-
protected static TableRow tableRow(TestKey key, TestValue value) {
return TableRowConverter.fromBinaryRow(binaryRow(key, value),
kvBinaryConverter);
}
@@ -181,7 +158,7 @@ public abstract class BaseMvStoragesTest {
}
}
- protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow
expectedRow) {
+ protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion,
TableRow expectedRow) {
assertThat(rowUnderQuestion, is(notNullValue()));
assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index c49a1ebe49..8eb1ab722b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -25,31 +25,24 @@ import org.rocksdb.RocksDB;
* Utilities for converting partition IDs and index names into Column Family
names and vice versa.
*/
class ColumnFamilyUtils {
- /**
- * Name of the meta column family matches default columns family, meaning
that it always exist when new table is created.
- */
+ /** Name of the meta column family matches default columns family, meaning
that it always exist when new table is created. */
static final String META_CF_NAME = new
String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.UTF_8);
- /**
- * Name of the Column Family that stores partition data.
- */
+ /** Name of the Column Family that stores partition data. */
static final String PARTITION_CF_NAME = "cf-part";
- /**
- * Name of the Column Family that stores hash index data.
- */
+ /** Name of the Column Family that stores garbage collection queue. */
+ static final String GC_QUEUE_CF_NAME = "cf-gc";
+
+ /** Name of the Column Family that stores hash index data. */
static final String HASH_INDEX_CF_NAME = "cf-hash";
- /**
- * Prefix for SQL indexes column family names.
- */
+ /** Prefix for SQL indexes column family names. */
static final String SORTED_INDEX_CF_PREFIX = "cf-sorted-";
- /**
- * Utility enum to describe a type of the column family - meta or
partition.
- */
+ /** Utility enum to describe a type of the column family - meta or
partition. */
enum ColumnFamilyType {
- META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN;
+ META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN;
/**
* Determines column family type by its name.
@@ -62,6 +55,8 @@ class ColumnFamilyUtils {
return META;
} else if (PARTITION_CF_NAME.equals(cfName)) {
return PARTITION;
+ } else if (GC_QUEUE_CF_NAME.equals(cfName)) {
+ return GC_QUEUE;
} else if (HASH_INDEX_CF_NAME.equals(cfName)) {
return HASH_INDEX;
} else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
new file mode 100644
index 0000000000..cf25f55b63
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final PartitionDataHelper helper;
+
+ /** RocksDB instance. */
+ private final RocksDB db;
+
+ /** GC queue column family. */
+ private final ColumnFamilyHandle gcQueueCf;
+
+ GarbageCollector(PartitionDataHelper helper, RocksDB db,
ColumnFamilyHandle gcQueueCf) {
+ this.helper = helper;
+ this.db = db;
+ this.gcQueueCf = gcQueueCf;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ ColumnFamilyHandle partCf = helper.partCf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (RocksIterator it = db.newIterator(partCf,
helper.upperBoundReadOpts)) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gcQueueCf, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ ColumnFamilyHandle partCf = helper.partCf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (RocksIterator gcIt = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gcQueueCf, gcKeyBuffer);
+
+ try (RocksIterator it = db.newIterator(partCf,
helper.upperBoundReadOpts)) {
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Find the row that should be garbage collected.
+ ByteBuffer dataKey = getRowForGcKey(it, gcElementRowId);
+
+ if (dataKey == null) {
+ // No row for GC.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(partCf, dataKey);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, RowId gcElementRowId,
+ HybridTimestamp gcElementTimestamp) throws RocksDBException {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ ColumnFamilyHandle partCf = helper.partCf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ return false;
+ } else {
+ dataKeyBuffer.clear();
+
+ it.key(dataKeyBuffer);
+
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ // There is no row for the GC queue element.
+ return false;
+ }
+ }
+
+ // Check if the new element, whose insertion scheduled the GC, was a
tombstone.
+ int len = it.value(EMPTY_DIRECT_BUFFER);
+
+ if (len == 0) {
+ // This is a tombstone, we need to delete it.
+ batch.delete(partCf, dataKeyBuffer);
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks if there is a row for garbage collection and returns this row's
key if it exists.
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return Key of the row that needs to be garbage collected, or {@code
null} if such row doesn't exist.
+ */
+ private @Nullable ByteBuffer getRowForGcKey(RocksIterator it, RowId
gcElementRowId) {
+ // Let's move to the element that was scheduled for GC.
+ it.next();
+
+ RowId gcRowId;
+
+ if (invalid(it)) {
+ return null;
+ }
+
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ int keyLen = it.key(dataKeyBuffer);
+
+ // Check if we moved to another row id's write-intent, that would mean
that there is no row to GC for the current row id.
+ if (keyLen == MAX_KEY_SIZE) {
+ gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+
+ // We might have moved to the next row id.
+ if (gcElementRowId.equals(gcRowId)) {
+ return dataKeyBuffer;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Deletes garbage collector's queue.
+ *
+ * @param writeBatch Write batch.
+ * @throws RocksDBException If failed to delete the queue.
+ */
+ void deleteQueue(WriteBatch writeBatch) throws RocksDBException {
+ writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
new file mode 100644
index 0000000000..00562bcd07
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+ /** Commit partition id size. */
+ static final int PARTITION_ID_SIZE = Short.BYTES;
+
+ /** UUID size in bytes. */
+ static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+ /** Position of row id inside the key. */
+ static final int ROW_ID_OFFSET = Short.BYTES;
+
+ /** Size of the key without timestamp. */
+ static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Maximum size of the data key. */
+ static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+ /** Transaction id size (part of the transaction state). */
+ private static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+ /** Commit table id size (part of the transaction state). */
+ private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+ /** Size of the value header (transaction state). */
+ static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE +
PARTITION_ID_SIZE;
+
+ /** Transaction id offset. */
+ static final int TX_ID_OFFSET = 0;
+
+ /** Commit table id offset. */
+ static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+ /** Commit partition id offset. */
+ static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+ /** Value offset (if transaction state is present). */
+ static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+ static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+ static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() ->
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Partition id. */
+ private final int partitionId;
+
+ /** Upper bound for scans. */
+ private final Slice upperBound;
+
+ /** Partition data column family. */
+ final ColumnFamilyHandle partCf;
+
+ /** Read options for regular scans. */
+ final ReadOptions upperBoundReadOpts;
+
+ /** Read options for total order scans. */
+ final ReadOptions scanReadOpts;
+
+ PartitionDataHelper(int partitionId, ColumnFamilyHandle partCf) {
+ this.partitionId = partitionId;
+ this.partCf = partCf;
+
+ this.upperBound = new Slice(partitionEndPrefix());
+ this.upperBoundReadOpts = new
ReadOptions().setIterateUpperBound(upperBound);
+ this.scanReadOpts = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ }
+
+ /**
+ * Creates a prefix of all keys in the given partition.
+ */
+ byte[] partitionStartPrefix() {
+ return unsignedShortAsBytes(partitionId);
+ }
+
+ /**
+ * Creates a prefix of all keys in the next partition, used as an
exclusive bound.
+ */
+ byte[] partitionEndPrefix() {
+ return unsignedShortAsBytes(partitionId + 1);
+ }
+
+ void putDataKey(ByteBuffer dataKeyBuffer, RowId rowId, HybridTimestamp
timestamp) {
+ dataKeyBuffer.putShort((short) partitionId);
+ putRowId(dataKeyBuffer, rowId);
+ putTimestampDesc(dataKeyBuffer, timestamp);
+
+ dataKeyBuffer.flip();
+ }
+
+ void putGcKey(ByteBuffer gcKeyBuffer, RowId rowId, HybridTimestamp
timestamp) {
+ gcKeyBuffer.putShort((short) partitionId);
+ putTimestampNatural(gcKeyBuffer, timestamp);
+ putRowId(gcKeyBuffer, rowId);
+
+ gcKeyBuffer.flip();
+ }
+
+ void putRowId(ByteBuffer keyBuffer, RowId rowId) {
+ assert rowId.partitionId() == partitionId : rowId;
+ assert keyBuffer.order() == KEY_BYTE_ORDER;
+
+ keyBuffer.putLong(normalize(rowId.mostSignificantBits()));
+ keyBuffer.putLong(normalize(rowId.leastSignificantBits()));
+ }
+
+ RowId getRowId(ByteBuffer keyBuffer, int offset) {
+ assert partitionId == (keyBuffer.getShort(0) & 0xFFFF);
+
+ return new RowId(partitionId, normalize(keyBuffer.getLong(offset)),
normalize(keyBuffer.getLong(offset + Long.BYTES)));
+ }
+
+ /**
+ * Converts signed long into a new long value, that when written in Big
Endian, will preserve the comparison order if compared
+ * lexicographically as an array of unsigned bytes. For example, values
{@code -1} and {@code 0}, when written in BE, will become
+ * {@code 0xFF..F} and {@code 0x00..0}, and lose their ascending order.
+ *
+ * <p>Flipping the sign bit will change the situation: {@code -1 ->
0x7F..F} and {@code 0 -> 0x80..0}.
+ */
+ static long normalize(long value) {
+ return value ^ (1L << 63);
+ }
+
+ /**
+ * Stores unsigned short (represented by int) in the byte array.
+ *
+ * @param value Unsigned short value.
+ * @return Byte array with unsigned short.
+ */
+ private static byte[] unsignedShortAsBytes(int value) {
+ return new byte[] {(byte) (value >>> 8), (byte) value};
+ }
+
+ /**
+ * Writes a timestamp into a byte buffer, in descending lexicographical
bytes order.
+ */
+ static void putTimestampDesc(ByteBuffer buf, HybridTimestamp ts) {
+ assert buf.order() == KEY_BYTE_ORDER;
+
+ // "bitwise negation" turns ascending order into a descending one.
+ buf.putLong(~ts.getPhysical());
+ buf.putInt(~ts.getLogical());
+ }
+
+ static HybridTimestamp readTimestampDesc(ByteBuffer keyBuf) {
+ assert keyBuf.order() == KEY_BYTE_ORDER;
+
+ long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
+ int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
+
+ return new HybridTimestamp(physical, logical);
+ }
+
+ /**
+ * Writes a timestamp into a byte buffer, in ascending lexicographical
bytes order.
+ */
+ static void putTimestampNatural(ByteBuffer buf, HybridTimestamp ts) {
+ assert buf.order() == KEY_BYTE_ORDER;
+
+ buf.putLong(ts.getPhysical());
+ buf.putInt(ts.getLogical());
+ }
+
+ static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf, int offset)
{
+ assert keyBuf.order() == KEY_BYTE_ORDER;
+
+ long physical = keyBuf.getLong(offset);
+ int logical = keyBuf.getInt(offset + Long.BYTES);
+
+ return new HybridTimestamp(physical, logical);
+ }
+
+ @Override
+ public void close() {
+ RocksUtils.closeAll(upperBoundReadOpts, upperBound);
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4507794523..b4c84a3a6b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -18,10 +18,23 @@
package org.apache.ignite.internal.storage.rocksdb;
import static java.lang.ThreadLocal.withInitial;
-import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.ByteBuffer.allocate;
import static java.util.Arrays.copyOf;
import static java.util.Arrays.copyOfRange;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ID_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TX_ID_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.normalize;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.partitionIdKey;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
@@ -34,7 +47,6 @@ import static
org.apache.ignite.internal.util.ByteUtils.putUuidToBytes;
import static org.rocksdb.ReadTier.PERSISTED_TIER;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -53,8 +65,10 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -75,79 +89,35 @@ import org.rocksdb.WriteOptions;
/**
* Multi-versioned partition storage implementation based on RocksDB. Stored
data has the following format.
*
- * <p/>Key:
- * <pre><code>
+ * <p>Key:
+ * <pre>{@code
* For write-intents
* | partId (2 bytes, BE) | rowId (16 bytes, BE) |
*
* For committed rows
* | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |
- * </code></pre>
+ * }</pre>
* Value:
- * <pre><code>
+ * <pre>{@code
* For write-intents
* | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes)
| Row data |
*
* For committed rows
* | Row data |
- * </code></pre>
+ * }</pre>
*
- * <p/>Pending transactions (write-intents) data doesn't have a timestamp
assigned, but they have transaction
+ * <p>Pending transactions (write-intents) data doesn't have a timestamp
assigned, but they have transaction
* state (txId, commitTableId and commitPartitionId).
*
- * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a
natural order of partitions.
+ * <p>BE means Big Endian, meaning that lexicographical bytes order matches a
natural order of partitions.
*
- * <p/>DESC means that timestamps are sorted from newest to oldest (N2O).
Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)}
+ * <p>DESC means that timestamps are sorted from newest to oldest (N2O).
+ * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer,
HybridTimestamp)}
* to see how it's achieved. Missing timestamp could be interpreted as a
moment infinitely far away in the future.
*/
public class RocksDbMvPartitionStorage implements MvPartitionStorage {
- /** Position of row id inside the key. */
- private static final int ROW_ID_OFFSET = Short.BYTES;
-
- /** UUID size in bytes. */
- private static final int ROW_ID_SIZE = 2 * Long.BYTES;
-
- /** Size of the key without timestamp. */
- private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
-
- /** Transaction id size (part of the transaction state). */
- private static final int TX_ID_SIZE = 2 * Long.BYTES;
-
- /** Commit table id size (part of the transaction state). */
- private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
-
- /** Commit partition id size (part of the transaction state). */
- private static final int PARTITION_ID_SIZE = Short.BYTES;
-
- /** Size of the value header (transaction state). */
- private static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE +
PARTITION_ID_SIZE;
-
- /** Transaction id offset. */
- private static final int TX_ID_OFFSET = 0;
-
- /** Commit table id offset. */
- private static final int TABLE_ID_OFFSET = TX_ID_SIZE;
-
- /** Commit partition id offset. */
- private static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET +
TABLE_ID_SIZE;
-
- /** Value offset (if transaction state is present). */
- private static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
-
- /** Maximum size of the key. */
- private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE +
HYBRID_TIMESTAMP_SIZE;
-
- private static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
-
- private static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
-
- /** Thread-local direct buffer instance to read keys from RocksDB. */
- private static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER =
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
-
/** Thread-local on-heap byte buffer instance to use for key
manipulations. */
- private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = withInitial(
- () -> ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)
- );
+ private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER =
withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
/** Thread-local write batch for {@link #runConsistently(WriteClosure)}. */
private final ThreadLocal<WriteBatchWithIndex> threadLocalWriteBatch = new
ThreadLocal<>();
@@ -165,8 +135,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** RocksDb instance. */
private final RocksDB db;
- /** Partitions column family. */
- private final ColumnFamilyHandle cf;
+ /** Helper for the rocksdb partition. */
+ private final PartitionDataHelper helper;
+
+ /** Garbage collector. */
+ private final GarbageCollector gc;
/** Meta column family. */
private final ColumnFamilyHandle meta;
@@ -180,14 +153,6 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Read options for reading persisted data. */
private final ReadOptions persistedTierReadOpts = new
ReadOptions().setReadTier(PERSISTED_TIER);
- /** Upper bound for scans and reads. */
- private final Slice upperBound;
-
- private final ReadOptions upperBoundReadOpts;
-
- /** Read options for scan iterators. */
- private final ReadOptions scanReadOptions;
-
/** Key to store applied index value in meta. */
private final byte[] lastAppliedIndexKey;
@@ -233,13 +198,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
this.tableStorage = tableStorage;
this.partitionId = partitionId;
db = tableStorage.db();
- cf = tableStorage.partitionCfHandle();
meta = tableStorage.metaCfHandle();
-
- upperBound = new Slice(partitionEndPrefix());
-
- upperBoundReadOpts = new
ReadOptions().setIterateUpperBound(upperBound);
- scanReadOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ helper = new PartitionDataHelper(partitionId,
tableStorage.partitionCfHandle());
+ gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle());
lastAppliedIndexKey = ("index" +
partitionId).getBytes(StandardCharsets.UTF_8);
lastAppliedTermKey = ("term" +
partitionId).getBytes(StandardCharsets.UTF_8);
@@ -377,12 +338,12 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
/**
- * Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding
memtable, and sets it as a new value of
+ * Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding
mem-table, and sets it as a new value of
* {@link #persistedIndex()}.
*
* @throws StorageException If failed to read index from the storage.
*/
- public void refreshPersistedIndex() throws StorageException {
+ void refreshPersistedIndex() throws StorageException {
if (!busyLock.enterBusy()) {
// Don't throw the exception, there's no point in that.
return;
@@ -433,7 +394,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* @param readOptions Read options to be used for reading.
* @return Group configuration.
*/
- private RaftGroupConfiguration readLastGroupConfig(ReadOptions
readOptions) {
+ private @Nullable RaftGroupConfiguration readLastGroupConfig(ReadOptions
readOptions) {
byte[] bytes;
try {
@@ -461,7 +422,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, keyBytes);
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, keyBytes);
// Previous value must belong to the same transaction.
if (previousValue != null) {
@@ -474,7 +435,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Write empty value as a tombstone.
if (previousValue != null) {
// Reuse old array with transaction id already written
to it.
- writeBatch.put(cf, keyBytes, copyOf(previousValue,
VALUE_HEADER_SIZE));
+ writeBatch.put(helper.partCf, keyBytes,
copyOf(previousValue, VALUE_HEADER_SIZE));
} else {
byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
@@ -482,13 +443,13 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
putUuidToBytes(commitTableId, valueHeaderBytes,
TABLE_ID_OFFSET);
putShort(valueHeaderBytes, PARTITION_ID_OFFSET,
(short) commitPartitionId);
- writeBatch.put(cf, keyBytes, valueHeaderBytes);
+ writeBatch.put(helper.partCf, keyBytes,
valueHeaderBytes);
}
} else {
writeUnversioned(keyBufArray, row, txId, commitTableId,
commitPartitionId);
}
} catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in
storage", e);
+ throw new StorageException("Failed to update a row in storage:
" + createStorageInfo(), e);
}
return res;
@@ -509,7 +470,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
byte[] rowBytes = rowBytes(row);
- ByteBuffer value = ByteBuffer.allocate(rowBytes.length +
VALUE_HEADER_SIZE);
+ ByteBuffer value = allocate(rowBytes.length + VALUE_HEADER_SIZE);
byte[] array = value.array();
putUuidToBytes(txId, array, TX_ID_OFFSET);
@@ -519,7 +480,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
value.position(VALUE_OFFSET).put(rowBytes);
// Write table row data as a value.
- writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), value.array());
+ writeBatch.put(helper.partCf, copyOf(keyArray, ROW_PREFIX_SIZE),
value.array());
}
private static byte[] rowBytes(TableRow row) {
@@ -532,14 +493,14 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
try {
byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf,
readOpts, keyBytes);
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, keyBytes);
if (previousValue == null) {
//the chain doesn't contain an uncommitted write intent
@@ -547,7 +508,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// Perform unconditional remove for the key without associated
timestamp.
- writeBatch.delete(cf, keyBytes);
+ writeBatch.delete(helper.partCf, keyBytes);
return wrapValueIntoTableRow(previousValue, true);
} catch (RocksDBException e) {
@@ -567,20 +528,33 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Read a value associated with pending write.
byte[] uncommittedKeyBytes = copyOf(keyBuf.array(),
ROW_PREFIX_SIZE);
- byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf,
readOpts, uncommittedKeyBytes);
+ byte[] valueBytes = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, uncommittedKeyBytes);
if (valueBytes == null) {
- //the chain doesn't contain an uncommitted write intent
+ // The chain doesn't contain an uncommitted write intent.
return null;
}
- // Delete pending write.
- writeBatch.delete(cf, uncommittedKeyBytes);
+ boolean isNewValueTombstone = valueBytes.length ==
VALUE_HEADER_SIZE;
- // Add timestamp to the key, and put the value back into the
storage.
- putTimestamp(keyBuf, timestamp);
+ // Both this and previous values for the row id are tombstones.
+ boolean newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch,
rowId, timestamp, isNewValueTombstone);
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+ // Delete pending write.
+ writeBatch.delete(helper.partCf, uncommittedKeyBytes);
+
+ // We only write tombstone if the previous value for the same
row id was not a tombstone.
+ // So there won't be consecutive tombstones for the same row
id.
+ if (!newAndPrevTombstones) {
+ // Add timestamp to the key, and put the value back into
the storage.
+ putTimestampDesc(keyBuf, timestamp);
+
+ writeBatch.put(
+ helper.partCf,
+ copyOf(keyBuf.array(), MAX_KEY_SIZE),
+ copyOfRange(valueBytes, VALUE_HEADER_SIZE,
valueBytes.length)
+ );
+ }
return null;
} catch (RocksDBException e) {
@@ -595,18 +569,31 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- putTimestamp(keyBuf, commitTimestamp);
+ putTimestampDesc(keyBuf, commitTimestamp);
+
+ boolean isNewValueTombstone = row == null;
//TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
- byte[] rowBytes = rowBytes(row);
+ byte[] rowBytes = row != null ? rowBytes(row) :
ArrayUtils.BYTE_EMPTY_ARRAY;
+ boolean newAndPrevTombstones; // Both this and previous values for
the row id are tombstones.
try {
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
-
- return null;
+ newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch, rowId,
commitTimestamp, isNewValueTombstone);
} catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in
storage", e);
+ throw new StorageException("Failed to add row to the GC queue:
" + createStorageInfo(), e);
+ }
+
+ // We only write tombstone if the previous value for the same row
id was not a tombstone.
+ // So there won't be consecutive tombstones for the same row id.
+ if (!newAndPrevTombstones) {
+ try {
+ writeBatch.put(helper.partCf, copyOf(keyBuf.array(),
MAX_KEY_SIZE), rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in
storage: " + createStorageInfo(), e);
+ }
}
+
+ return null;
});
}
@@ -625,11 +612,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
try (
// Set next partition as an upper bound.
- RocksIterator baseIterator = db.newIterator(cf,
upperBoundReadOpts);
+ RocksIterator baseIterator = db.newIterator(helper.partCf,
helper.upperBoundReadOpts);
// "count()" check is mandatory. Write batch iterator
without any updates just crashes everything.
// It's not documented, but this is exactly how it should
be used.
RocksIterator seekIterator = writeBatch != null &&
writeBatch.count() > 0
- ? writeBatch.newIteratorWithBase(cf, baseIterator)
+ ? writeBatch.newIteratorWithBase(helper.partCf,
baseIterator)
: baseIterator
) {
if (lookingForLatestVersions(timestamp)) {
@@ -659,7 +646,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return ReadResult.empty(rowId);
}
- ByteBuffer readKeyBuf =
MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+ ByteBuffer readKeyBuf =
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
int keyLength = seekIterator.key(readKeyBuf);
@@ -682,7 +669,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- return wrapCommittedValue(rowId, valueBytes,
readTimestamp(keyBuf));
+ return wrapCommittedValue(rowId, valueBytes,
readTimestampDesc(keyBuf));
}
return wrapUncommittedValue(rowId, valueBytes, null);
@@ -700,7 +687,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
// Put timestamp restriction according to N2O timestamps order.
- putTimestamp(keyBuf, timestamp);
+ putTimestampDesc(keyBuf, timestamp);
// This seek will either find a key with timestamp that's less or
equal than required value, or a different key whatsoever.
// It is guaranteed by descending order of timestamps.
@@ -724,7 +711,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// There's no guarantee that required key even exists. If it doesn't,
then "seek" will point to a different key.
// To avoid returning its value, we have to check that actual key
matches what we need.
// Here we prepare direct buffer to read key without timestamp. Shared
direct buffer is used to avoid extra memory allocations.
- ByteBuffer foundKeyBuf =
MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+ ByteBuffer foundKeyBuf =
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
int keyLength = 0;
@@ -743,7 +730,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return ReadResult.empty(rowId);
}
- foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
keyLength = seekIterator.key(foundKeyBuf);
if (!matches(rowId, foundKeyBuf)) {
@@ -764,7 +751,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return wrapUncommittedValue(rowId, valueBytes, null);
}
- foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
seekIterator.key(foundKeyBuf);
if (!matches(rowId, foundKeyBuf)) {
@@ -779,7 +766,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Should not be write-intent, as we were seeking with the
timestamp.
assert keyLength == MAX_KEY_SIZE;
- HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf);
+ HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf);
byte[] valueBytes = seekIterator.value();
@@ -797,7 +784,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
}
- foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+ foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
keyLength = seekIterator.key(foundKeyBuf);
if (!matches(rowId, foundKeyBuf)) {
@@ -811,7 +798,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return wrapUncommittedValue(rowId, seekIterator.value(),
rowTimestamp);
}
- return wrapCommittedValue(rowId, valueBytes,
readTimestamp(foundKeyBuf));
+ return wrapCommittedValue(rowId, valueBytes,
readTimestampDesc(foundKeyBuf));
}
}
@@ -844,12 +831,12 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
var options = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
- RocksIterator it = db.newIterator(cf, options);
+ RocksIterator it = db.newIterator(helper.partCf, options);
WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
if (writeBatch != null && writeBatch.count() > 0) {
- it = writeBatch.newIteratorWithBase(cf, it);
+ it = writeBatch.newIteratorWithBase(helper.partCf, it);
}
it.seek(lowerBound);
@@ -908,15 +895,15 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
});
}
- private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable
HybridTimestamp timestamp) {
+ private static void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable
HybridTimestamp timestamp) {
keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES,
normalize(rowId.leastSignificantBits()));
if (timestamp != null) {
- putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+ putTimestampDesc(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
}
- keyBuf.position(0);
+ keyBuf.rewind();
}
@Override
@@ -924,9 +911,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- ByteBuffer keyBuf =
prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
+ ByteBuffer keyBuf =
prepareHeapKeyBuf(lowerBound).rewind().limit(ROW_PREFIX_SIZE);
- try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
+ try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
it.seek(keyBuf);
if (!it.isValid()) {
@@ -935,7 +922,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return null;
}
- ByteBuffer readKeyBuf =
MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
+ ByteBuffer readKeyBuf =
MV_KEY_BUFFER.get().rewind().limit(ROW_PREFIX_SIZE);
it.key(readKeyBuf);
@@ -969,13 +956,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
buf.putShort(0, partitionId);
- buf.position(0);
+ buf.rewind();
}
private RowId getRowId(ByteBuffer keyBuffer) {
- keyBuffer.position(ROW_ID_OFFSET);
-
- return new RowId(partitionId, normalize(keyBuffer.getLong()),
normalize(keyBuffer.getLong()));
+ return helper.getRowId(keyBuffer, ROW_ID_OFFSET);
}
@Override
@@ -984,11 +969,11 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
try (
- var upperBound = new Slice(partitionEndPrefix());
+ var upperBound = new Slice(helper.partitionEndPrefix());
var options = new
ReadOptions().setIterateUpperBound(upperBound);
- RocksIterator it = db.newIterator(cf, options)
+ RocksIterator it = db.newIterator(helper.partCf, options)
) {
- it.seek(partitionStartPrefix());
+ it.seek(helper.partitionStartPrefix());
long size = 0;
@@ -1005,14 +990,29 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/**
* Deletes partition data from the storage, using write batch to perform
the operation.
*/
- public void destroyData(WriteBatch writeBatch) throws RocksDBException {
+ void destroyData(WriteBatch writeBatch) throws RocksDBException {
writeBatch.delete(meta, lastAppliedIndexKey);
writeBatch.delete(meta, lastAppliedTermKey);
writeBatch.delete(meta, lastGroupConfigKey);
writeBatch.delete(meta, partitionIdKey(partitionId));
- writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
+
+ gc.deleteQueue(writeBatch);
+ }
+
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ try {
+ return gc.pollForVacuum(requireWriteBatch(), lowWatermark);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to collect garbage: " +
createStorageInfo(), e);
+ }
+ });
}
@Override
@@ -1027,7 +1027,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
busyLock.block();
- RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts,
scanReadOptions, upperBoundReadOpts, upperBound);
+ RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts);
+
+ helper.close();
}
private WriteBatchWithIndex requireWriteBatch() {
@@ -1046,44 +1048,13 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
private ByteBuffer prepareHeapKeyBuf(RowId rowId) {
assert rowId.partitionId() == partitionId : rowId;
- ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().position(0);
+ ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().rewind();
keyBuf.putShort((short) rowId.partitionId());
- keyBuf.putLong(normalize(rowId.mostSignificantBits()));
- keyBuf.putLong(normalize(rowId.leastSignificantBits()));
- return keyBuf;
- }
+ helper.putRowId(keyBuf, rowId);
- /**
- * Converts signed long into a new long value, that when written in Big
Endian, will preserve the comparison order if compared
- * lexicographically as an array of unsigned bytes. For example, values
{@code -1} and {@code 0}, when written in BE, will become
- * {@code 0xFF..F} and {@code 0x00..0}, and lose their ascending order.
- *
- * <p/>Flipping the sign bit will change the situation: {@code -1 ->
0x7F..F} and {@code 0 -> 0x80..0}.
- */
- private static long normalize(long value) {
- return value ^ (1L << 63);
- }
-
- /**
- * Writes a timestamp into a byte buffer, in descending lexicographical
bytes order.
- */
- private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) {
- assert buf.order() == KEY_BYTE_ORDER;
-
- // "bitwise negation" turns ascending order into a descending one.
- buf.putLong(~ts.getPhysical());
- buf.putInt(~ts.getLogical());
- }
-
- private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) {
- assert keyBuf.order() == KEY_BYTE_ORDER;
-
- long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
- int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
-
- return new HybridTimestamp(physical, logical);
+ return keyBuf;
}
private static void putShort(byte[] array, int off, short value) {
@@ -1102,7 +1073,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/**
* Checks iterator validity, including both finished iteration and
occurred exception.
*/
- private static boolean invalid(RocksIterator it) {
+ static boolean invalid(RocksIterator it) {
boolean invalid = !it.isValid();
if (invalid) {
@@ -1180,18 +1151,14 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* Creates a prefix of all keys in the given partition.
*/
public byte[] partitionStartPrefix() {
- return unsignedShortAsBytes(partitionId);
+ return helper.partitionStartPrefix();
}
/**
* Creates a prefix of all keys in the next partition, used as an
exclusive bound.
*/
public byte[] partitionEndPrefix() {
- return unsignedShortAsBytes(partitionId + 1);
- }
-
- private static byte[] unsignedShortAsBytes(int value) {
- return new byte[] {(byte) (value >>> 8), (byte) value};
+ return helper.partitionEndPrefix();
}
/**
@@ -1202,7 +1169,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
private abstract class BasePartitionTimestampCursor implements
PartitionTimestampCursor {
- protected final RocksIterator it = db.newIterator(cf, scanReadOptions);
+ protected final RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts);
// Here's seek buffer itself. Originally it contains a valid partition
id, row id payload that's filled with zeroes, and maybe
// a timestamp value. Zero row id guarantees that it's
lexicographically less than or equal to any other row id stored in the
@@ -1211,9 +1178,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// - no one guarantees that there will only be a single cursor;
// - no one guarantees that returned cursor will not be used by other
threads.
// The thing is, we need this buffer to preserve its content between
invocations of "hasNext" method.
- protected final ByteBuffer seekKeyBuf =
ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short)
partitionId);
+ final ByteBuffer seekKeyBuf =
allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
- protected RowId currentRowId;
+ RowId currentRowId;
/** Cached value for {@link #next()} method. Also optimizes the code
of {@link #hasNext()}. */
protected ReadResult next;
@@ -1254,6 +1221,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
});
}
+ @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") //
It can.
@Override
public final ReadResult next() {
return busy(() -> {
@@ -1293,10 +1261,10 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
currentRowId = null;
// Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
+ ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().rewind();
while (true) {
- currentKeyBuffer.position(0);
+ currentKeyBuffer.rewind();
// At this point, seekKeyBuf should contain row id that's
above the one we already scanned, but not greater than any
// other row id in partition. When we start, row id is filled
with zeroes. Value during the iteration is described later
@@ -1361,7 +1329,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
if (matches(rowId, key)) {
// This is a next version of current row.
- nextCommitTimestamp = readTimestamp(key);
+ nextCommitTimestamp = readTimestampDesc(key);
}
}
}
@@ -1374,7 +1342,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- readResult = wrapCommittedValue(rowId, valueBytes,
readTimestamp(currentKeyBuffer));
+ readResult = wrapCommittedValue(rowId, valueBytes,
readTimestampDesc(currentKeyBuffer));
} else {
readResult = wrapUncommittedValue(rowId, valueBytes,
nextCommitTimestamp);
}
@@ -1392,7 +1360,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
private final class ScanByTimestampCursor extends
BasePartitionTimestampCursor {
private final HybridTimestamp timestamp;
- public ScanByTimestampCursor(HybridTimestamp timestamp) {
+ private ScanByTimestampCursor(HybridTimestamp timestamp) {
this.timestamp = timestamp;
}
@@ -1411,7 +1379,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
currentRowId = null;
// Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+ ByteBuffer directBuffer = MV_KEY_BUFFER.get().rewind();
while (true) {
//TODO IGNITE-18201 Remove copying.
@@ -1422,7 +1390,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// We need to figure out what current row id is.
- it.key(directBuffer.position(0));
+ it.key(directBuffer.rewind());
RowId rowId = getRowId(directBuffer);
@@ -1494,7 +1462,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
*
* @throws StorageRebalanceException If there was an error when aborting
the rebalance.
*/
- void abortReblance(WriteBatch writeBatch) {
+ void abortRebalance(WriteBatch writeBatch) {
if (!state.compareAndSet(StorageState.REBALANCE,
StorageState.RUNNABLE)) {
throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
}
@@ -1533,7 +1501,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
writeBatch.delete(meta, lastGroupConfigKey);
writeBatch.delete(meta, partitionIdKey(partitionId));
- writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
+
+ gc.deleteQueue(writeBatch);
}
private void saveLastApplied(WriteBatch writeBatch, long lastAppliedIndex,
long lastAppliedTerm) throws RocksDBException {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 39987f5153..4767d1314d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.GC_QUEUE_CF_NAME;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
@@ -114,6 +115,9 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Column Family handle for partition data. */
private volatile ColumnFamily partitionCf;
+ /** Column Family handle for GC queue. */
+ private volatile ColumnFamily gcQueueCf;
+
/** Column Family handle for Hash Index data. */
private volatile ColumnFamily hashIndexCf;
@@ -186,6 +190,13 @@ public class RocksDbTableStorage implements MvTableStorage
{
return meta.columnFamily().handle();
}
+ /**
+ * Returns a column family handle for GC queue.
+ */
+ public ColumnFamilyHandle gcQueueHandle() {
+ return gcQueueCf.handle();
+ }
+
@Override
public TableConfiguration configuration() {
return tableCfg;
@@ -243,6 +254,11 @@ public class RocksDbTableStorage implements MvTableStorage
{
break;
+ case GC_QUEUE:
+ gcQueueCf = cf;
+
+ break;
+
case HASH_INDEX:
hashIndexCf = cf;
@@ -333,6 +349,7 @@ public class RocksDbTableStorage implements MvTableStorage {
resources.add(meta.columnFamily().handle());
resources.add(partitionCf.handle());
+ resources.add(gcQueueCf.handle());
resources.add(hashIndexCf.handle());
resources.addAll(
sortedIndices.values().stream()
@@ -579,7 +596,7 @@ public class RocksDbTableStorage implements MvTableStorage {
/**
* Returns a list of Column Families' names that belong to a RocksDB
instance in the given path.
*
- * @return Map with column families names.
+ * @return List with column families names.
* @throws StorageException If something went wrong.
*/
private List<String> getExistingCfNames() {
@@ -593,7 +610,7 @@ public class RocksDbTableStorage implements MvTableStorage {
// even if the database is new (no existing Column Families), we
return the names of mandatory column families, that
// will be created automatically.
- return existingNames.isEmpty() ? List.of(META_CF_NAME,
PARTITION_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
+ return existingNames.isEmpty() ? List.of(META_CF_NAME,
PARTITION_CF_NAME, GC_QUEUE_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
} catch (RocksDBException e) {
throw new StorageException(
"Failed to read list of column families names for the
RocksDB instance located at path " + absolutePathStr, e
@@ -618,12 +635,18 @@ public class RocksDbTableStorage implements
MvTableStorage {
private ColumnFamilyDescriptor cfDescriptorFromName(String cfName) {
switch (ColumnFamilyType.fromCfName(cfName)) {
case META:
- case PARTITION:
+ case GC_QUEUE:
return new ColumnFamilyDescriptor(
cfName.getBytes(UTF_8),
new ColumnFamilyOptions()
);
+ case PARTITION:
+ return new ColumnFamilyDescriptor(
+ cfName.getBytes(UTF_8),
+ new
ColumnFamilyOptions().useFixedLengthPrefixExtractor(PartitionDataHelper.ROW_PREFIX_SIZE)
+ );
+
case HASH_INDEX:
return new ColumnFamilyDescriptor(
cfName.getBytes(UTF_8),
@@ -703,7 +726,7 @@ public class RocksDbTableStorage implements MvTableStorage {
}
try (WriteBatch writeBatch = new WriteBatch()) {
- mvPartitionStorage.abortReblance(writeBatch);
+ mvPartitionStorage.abortRebalance(writeBatch);
getHashIndexStorages(partitionId).forEach(index ->
index.abortReblance(writeBatch));
getSortedIndexStorages(partitionId).forEach(index ->
index.abortReblance(writeBatch));
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
new file mode 100644
index 0000000000..0c0cb220a7
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
+ @InjectConfiguration
+ private RocksDbStorageEngineConfiguration engineConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createEngine() {
+ return new RocksDbStorageEngine(engineConfig, workDir);
+ }
+}
diff --git a/modules/storage-rocksdb/tech-notes/garbage-collection.md
b/modules/storage-rocksdb/tech-notes/garbage-collection.md
new file mode 100644
index 0000000000..bed34edf42
--- /dev/null
+++ b/modules/storage-rocksdb/tech-notes/garbage-collection.md
@@ -0,0 +1,95 @@
+# Garbage Collection in the RocksDB partition storage
+
+This document describes the process of Garbage Collection in the RocksDB-based
storage.
+The goal of this process is to remove stale data based on a given timestamp.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage
collection. Older versions of rows can
+be garbage collected if and only if there's a newer version of the row and
this new version's timestamp
+is below the low watermark. This low watermark indicates the minimal timestamp
that a running transaction might have.
+
+Consider the following example:
+*Note that **Record number** is a hypothetical value that helps referring to
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1 | Foo | 1 |
+| 2 | Foo | 10 |
+
+In this case, we can only remove record 1 if the low watermark is 10 or
higher. If watermark is at 9,
+then it means that there can still occur a transaction with a 9 timestamp,
which means that the record number 1
+is still needed.
+This is why we only add a new entry into the GC queue if there is a previous
version and that is
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:
+*Note that **Is tombstone** is a hypothetical value that helps referring to
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1 | Foo | 1 | False |
+| 2 | Foo | 10 | True |
+| 3 | Foo | 20 | False |
+
+Everything said before stands for this example, however we can also remove the
record number 2, because it is
+a tombstone. So if the watermark is higher or equal to 10 and there is a
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get
a newer version.
+
+So to sum up, the algorithm looks like this:
+
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB, if the element's
timestamp is below the watermark, exiting otherwise
+3. Find an element in the data column family that corresponds to the element
of GC queue. If a value doesn't exist, exit
+4. Test if it is a tombstone, if yes, add it to the batch for removal
+5. Seek for a previous version. If it doesn't exist, exit
+6. Add that previous version to the batch for removal
+
+You might notice that there are two cases when we can exit prematurely, apart
from queue being empty.
+We might have not found a value that triggered the addition to the GC queue
and/or the value that needs to be
+garbage collected because GC can run in parallel. So if two parallel threads
got the same element from the
+queue, one of them might have already finished the GC and removed the elements.
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the
following
+format. The key:
+
+| Partition id | Timestamp | Row id |
+|--------------|-------------------------------------------|----------------|
+| 2-byte | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only
put new value's timestamp
+please refer to the Garbage Collection
[algorithm](#garbage-collection-algorithm).
+The queue is updated along with the data column family in a single batch and
is destroyed when the storage
+is being cleared or destroyed.
+
+## Storage implications
+
+To save space we don't store consecutive tombstones.
+For example, if a user removes a certain row twice
+
+```
+storage.put(key, value); // Time 1
+storage.delete(key); // Time 10
+storage.delete(key); // Time 20
+```
+
+There should be one row with a value and one row with a tombstone, the
tombstone being
+the oldest one. This also simplifies the processing of the garbage collection
queue.
+So in the storage we will see something like:
+
+| Key | Value | Timestamp |
+|-----|-------------|-----------|
+| Foo | Bar | 1 |
+| Foo | <tombstone> | 10 |