This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18020
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d1d78ae0fc00e2d407c1662df5241d68cd8cc82d
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Feb 2 11:51:28 2023 +0400

    IGNITE-18020 add GC support to rocksdb
---
 gradle/libs.versions.toml                          |   2 +-
 .../AbstractMvPartitionStorageConcurrencyTest.java |  17 --
 .../internal/storage/BaseMvStoragesTest.java       |  46 +---
 .../storage/rocksdb/ColumnFamilyUtils.java         |   9 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 266 +++++++++++++++++++--
 .../storage/rocksdb/RocksDbTableStorage.java       |  21 +-
 .../rocksdb/RocksDbMvPartitionStorageGcTest.java   |  45 ++++
 7 files changed, 327 insertions(+), 79 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index de9d67aa59..ddc93a2355 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -57,7 +57,7 @@ jsonpath = "2.4.0"
 classgraph = "4.8.110"
 javassist = "3.28.0-GA"
 checker = "3.10.0"
-rocksdb = "7.3.1"
+rocksdb = "7.9.2"
 disruptor = "3.3.7"
 metrics = "4.0.2"
 jctools = "3.3.0"
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 7e1e8d9d7c..b3a4742c4c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.storage;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.TableRow;
-import org.apache.ignite.internal.storage.impl.TestStorageEngine;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
@@ -85,9 +83,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testRegularGcAndRead(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
 
@@ -108,9 +103,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit.perform(this, 
TABLE_ROW);
 
@@ -129,9 +121,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -153,9 +142,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -179,9 +165,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index b7e83ce61f..2ff77b10e4 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -52,58 +52,36 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 
 /**
  * Base test for MV storages, contains pojo classes, their descriptor and a 
marshaller instance.
  */
 public abstract class BaseMvStoragesTest {
     /** Default reflection marshaller factory. */
-    protected static MarshallerFactory marshallerFactory;
+    protected static final MarshallerFactory marshallerFactory = new 
ReflectionMarshallerFactory();
 
     /** Schema descriptor for tests. */
-    protected static SchemaDescriptor schemaDescriptor;
+    protected static final SchemaDescriptor schemaDescriptor = new 
SchemaDescriptor(1, new Column[]{
+            new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+            new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, 
false),
+    }, new Column[]{
+            new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+            new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, 
false),
+    });
 
     /** Key-value marshaller for tests. */
-    protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+    protected static final KvMarshaller<TestKey, TestValue> kvMarshaller
+            = marshallerFactory.create(schemaDescriptor, TestKey.class, 
TestValue.class);
 
     /** Key-value {@link BinaryTuple} converter for tests. */
-    protected static BinaryConverter kvBinaryConverter;
+    protected static final BinaryConverter kvBinaryConverter = 
BinaryConverter.forRow(schemaDescriptor);
 
     /** Key {@link BinaryTuple} converter for tests. */
-    protected static BinaryConverter kBinaryConverter;
+    protected static final BinaryConverter kBinaryConverter = 
BinaryConverter.forKey(schemaDescriptor);
 
     /** Hybrid clock to generate timestamps. */
     protected final HybridClock clock = new HybridClockImpl();
 
-    @BeforeAll
-    static void beforeAll() {
-        marshallerFactory = new ReflectionMarshallerFactory();
-
-        schemaDescriptor = new SchemaDescriptor(1, new Column[]{
-                new Column("intKey".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
-                new Column("strKey".toUpperCase(Locale.ROOT), 
NativeTypes.STRING, false),
-        }, new Column[]{
-                new Column("intVal".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
-                new Column("strVal".toUpperCase(Locale.ROOT), 
NativeTypes.STRING, false),
-        });
-
-        kvMarshaller = marshallerFactory.create(schemaDescriptor, 
TestKey.class, TestValue.class);
-
-        kvBinaryConverter = BinaryConverter.forRow(schemaDescriptor);
-        kBinaryConverter = BinaryConverter.forKey(schemaDescriptor);
-    }
-
-    @AfterAll
-    static void afterAll() {
-        kvMarshaller = null;
-        schemaDescriptor = null;
-        marshallerFactory = null;
-        kvBinaryConverter = null;
-        kBinaryConverter = null;
-    }
-
     protected static TableRow tableRow(TestKey key, TestValue value) {
         return TableRowConverter.fromBinaryRow(binaryRow(key, value), 
kvBinaryConverter);
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index c49a1ebe49..eb8bb97947 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -35,6 +35,11 @@ class ColumnFamilyUtils {
      */
     static final String PARTITION_CF_NAME = "cf-part";
 
+    /**
+     * Name of the Column Family that stores garbage collection queue.
+     */
+    static final String GC_QUEUE_CF_NAME = "cf-gc";
+
     /**
      * Name of the Column Family that stores hash index data.
      */
@@ -49,7 +54,7 @@ class ColumnFamilyUtils {
      * Utility enum to describe a type of the column family - meta or 
partition.
      */
     enum ColumnFamilyType {
-        META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN;
+        META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN;
 
         /**
          * Determines column family type by its name.
@@ -62,6 +67,8 @@ class ColumnFamilyUtils {
                 return META;
             } else if (PARTITION_CF_NAME.equals(cfName)) {
                 return PARTITION;
+            } else if (GC_QUEUE_CF_NAME.equals(cfName)) {
+                return GC_QUEUE;
             } else if (HASH_INDEX_CF_NAME.equals(cfName)) {
                 return HASH_INDEX;
             } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4507794523..c141b3910f 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocate;
 import static java.nio.ByteBuffer.allocateDirect;
 import static java.util.Arrays.copyOf;
 import static java.util.Arrays.copyOfRange;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -97,7 +99,8 @@ import org.rocksdb.WriteOptions;
  *
  * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
  *
- * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). 
Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)}
+ * <p/>DESC means that timestamps are sorted from newest to oldest (N2O).
+ * Please refer to {@link #putTimestampDesc(ByteBuffer, HybridTimestamp)}
  * to see how it's achieved. Missing timestamp could be interpreted as a 
moment infinitely far away in the future.
  */
 public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@@ -171,6 +174,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Meta column family. */
     private final ColumnFamilyHandle meta;
 
+    /** GC queue column family. */
+    private final ColumnFamilyHandle gc;
+
     /** Write options. */
     private final WriteOptions writeOpts = new 
WriteOptions().setDisableWAL(true);
 
@@ -235,6 +241,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         db = tableStorage.db();
         cf = tableStorage.partitionCfHandle();
         meta = tableStorage.metaCfHandle();
+        gc = tableStorage.gcQueueHandle();
 
         upperBound = new Slice(partitionEndPrefix());
 
@@ -570,17 +577,23 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, uncommittedKeyBytes);
 
                 if (valueBytes == null) {
-                    //the chain doesn't contain an uncommitted write intent
+                    // The chain doesn't contain an uncommitted write intent.
                     return null;
                 }
 
+                boolean isNewValueTombstone = valueBytes.length == 
VALUE_HEADER_SIZE;
+
+                boolean isPreviousValueTombstone = 
maybeAddToGcQueue(writeBatch, rowId, timestamp, isNewValueTombstone);
+
                 // Delete pending write.
                 writeBatch.delete(cf, uncommittedKeyBytes);
 
-                // Add timestamp to the key, and put the value back into the 
storage.
-                putTimestamp(keyBuf, timestamp);
+                if (!(isNewValueTombstone && isPreviousValueTombstone)) {
+                    // Add timestamp to the key, and put the value back into 
the storage.
+                    putTimestampDesc(keyBuf, timestamp);
 
-                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+                    writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+                }
 
                 return null;
             } catch (RocksDBException e) {
@@ -595,21 +608,82 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-            putTimestamp(keyBuf, commitTimestamp);
+            putTimestampDesc(keyBuf, commitTimestamp);
+
+            boolean isNewValueTombstone = row == null;
 
             //TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-            byte[] rowBytes = rowBytes(row);
+            byte[] rowBytes = row != null ? rowBytes(row) : new byte[0];
 
+            boolean isPreviousValueTombstone;
             try {
-                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
rowBytes);
-
-                return null;
+                isPreviousValueTombstone = maybeAddToGcQueue(writeBatch, 
rowId, commitTimestamp, isNewValueTombstone);
             } catch (RocksDBException e) {
-                throw new StorageException("Failed to update a row in 
storage", e);
+                throw new StorageException("Failed to add row to the GC 
queue", e);
+            }
+
+            if (!(isNewValueTombstone && isPreviousValueTombstone)) {
+                try {
+                    writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
rowBytes);
+                } catch (RocksDBException e) {
+                    throw new StorageException("Failed to update a row in 
storage", e);
+                }
             }
+
+            return null;
         });
     }
 
+    private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId 
rowId, HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        boolean isPreviousValueTombstone = false;
+
+        ByteBuffer key = allocateDirect(MAX_KEY_SIZE);
+        key.putShort((short) partitionId);
+        putRowId(key, rowId);
+        putTimestampDesc(key, timestamp);
+
+        try (
+                Slice upperBound = new Slice(partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            key.flip();
+            it.seek(key);
+
+            if (!invalid(it)) {
+                key.position(0);
+                int keyLen = it.key(key);
+
+                RowId readRowId = getRowId(key);
+
+                if (readRowId.equals(rowId)) {
+                    assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                    if (isNewValueTombstone) {
+                        ByteBuffer checkBuffer = allocateDirect(0);
+
+                        int valueSize = it.value(checkBuffer);
+
+                        isPreviousValueTombstone = valueSize == 0;
+                    }
+
+                    if (!(isNewValueTombstone && isPreviousValueTombstone)) {
+                        ByteBuffer gcKey = allocate(MAX_KEY_SIZE);
+
+                        gcKey.putShort((short) partitionId);
+                        putTimestampNatural(gcKey, timestamp);
+                        putRowId(gcKey, rowId);
+
+                        writeBatch.put(gc, copyOf(gcKey.array(), 
MAX_KEY_SIZE), new byte[0]);
+                    }
+                }
+            }
+        }
+
+        return isPreviousValueTombstone;
+    }
+
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         return busy(() -> {
@@ -682,7 +756,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         if (!isWriteIntent) {
             // There is no write-intent, return latest committed row.
-            return wrapCommittedValue(rowId, valueBytes, 
readTimestamp(keyBuf));
+            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(keyBuf));
         }
 
         return wrapUncommittedValue(rowId, valueBytes, null);
@@ -700,7 +774,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         // Put timestamp restriction according to N2O timestamps order.
-        putTimestamp(keyBuf, timestamp);
+        putTimestampDesc(keyBuf, timestamp);
 
         // This seek will either find a key with timestamp that's less or 
equal than required value, or a different key whatsoever.
         // It is guaranteed by descending order of timestamps.
@@ -779,7 +853,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             // Should not be write-intent, as we were seeking with the 
timestamp.
             assert keyLength == MAX_KEY_SIZE;
 
-            HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf);
+            HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf);
 
             byte[] valueBytes = seekIterator.value();
 
@@ -811,7 +885,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return wrapUncommittedValue(rowId, seekIterator.value(), 
rowTimestamp);
             }
 
-            return wrapCommittedValue(rowId, valueBytes, 
readTimestamp(foundKeyBuf));
+            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(foundKeyBuf));
         }
     }
 
@@ -913,7 +987,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, 
normalize(rowId.leastSignificantBits()));
 
         if (timestamp != null) {
-            putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+            putTimestampDesc(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
         }
 
         keyBuf.position(0);
@@ -973,9 +1047,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     private RowId getRowId(ByteBuffer keyBuffer) {
-        keyBuffer.position(ROW_ID_OFFSET);
+        return getRowId(keyBuffer, ROW_ID_OFFSET);
+    }
 
-        return new RowId(partitionId, normalize(keyBuffer.getLong()), 
normalize(keyBuffer.getLong()));
+    private RowId getRowId(ByteBuffer keyBuffer, int offset) {
+        return new RowId(partitionId, normalize(keyBuffer.getLong(offset)), 
normalize(keyBuffer.getLong(offset + Long.BYTES)));
     }
 
     @Override
@@ -1015,6 +1091,117 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            ByteBuffer seekKey = MV_KEY_BUFFER.get();
+            seekKey.clear();
+
+            try (
+                    WriteBatch batch = new WriteBatch();
+                    var gcUpperBound = new Slice(partitionEndPrefix());
+                    ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+                    RocksIterator gcIt = db.newIterator(gc, gcOpts);
+            ) {
+                gcIt.seek(partitionStartPrefix());
+
+                if (invalid(gcIt)) {
+                    return null;
+                }
+
+                while (true) {
+                    if (invalid(gcIt)) {
+                        return null;
+                    }
+
+                    seekKey.position(0);
+                    gcIt.key(seekKey);
+
+                    HybridTimestamp timestamp = readTimestampNatural(seekKey, 
2);
+                    RowId readRowId = getRowId(seekKey, 14);
+
+                    if (timestamp.compareTo(lowWatermark) > 0) {
+                        return null;
+                    }
+
+                    try (
+                            var upperBound = new Slice(partitionEndPrefix());
+                            ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+                            RocksIterator it = db.newIterator(cf, opts);
+                    ) {
+                        ByteBuffer byteBuffer = allocateDirect(MAX_KEY_SIZE);
+                        byteBuffer.putShort((short) partitionId);
+                        putRowId(byteBuffer, readRowId);
+                        putTimestampDesc(byteBuffer, timestamp);
+                        byteBuffer.flip();
+
+                        it.seek(byteBuffer);
+
+                        if (invalid(it)) {
+                            return null;
+                        }
+
+                        {
+                            ByteBuffer checkBuffer = allocateDirect(0);
+
+                            int len = it.value(checkBuffer);
+
+                            if (len == 0) {
+                                // This is a tombstone, we need to delete it.
+                                byteBuffer.flip();
+
+                                try {
+                                    batch.delete(cf, byteBuffer);
+                                } catch (RocksDBException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        }
+                        it.next();
+
+                        if (invalid(it)) {
+                            return null;
+                        }
+
+                        byteBuffer.flip();
+                        int keyLen = it.key(byteBuffer);
+
+                        if (keyLen != MAX_KEY_SIZE) {
+                            return null;
+                        }
+
+                        RowId gcRowId = getRowId(byteBuffer);
+
+                        if (!readRowId.equals(gcRowId)) {
+                            return null;
+                        }
+
+                        HybridTimestamp nextTs = readTimestampDesc(byteBuffer);
+
+                        byte[] valueBytes = it.value();
+
+                        var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                        TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcRowId);
+
+                        try {
+                            byteBuffer.position(0);
+                            seekKey.position(0);
+                            batch.delete(cf, byteBuffer);
+                            batch.delete(gc, seekKey);
+                            db.write(writeOpts, batch);
+                        } catch (RocksDBException e) {
+                            // No-op.
+                            e.printStackTrace();
+                        }
+
+                        return retVal;
+                    }
+
+                }
+            }
+        });
+    }
+
     @Override
     public void close() {
         if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
@@ -1049,12 +1236,20 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().position(0);
 
         keyBuf.putShort((short) rowId.partitionId());
-        keyBuf.putLong(normalize(rowId.mostSignificantBits()));
-        keyBuf.putLong(normalize(rowId.leastSignificantBits()));
+
+        putRowId(keyBuf, rowId);
 
         return keyBuf;
     }
 
+    private void putRowId(ByteBuffer keyBuffer, RowId rowId) {
+        assert rowId.partitionId() == partitionId : rowId;
+        assert keyBuffer.order() == KEY_BYTE_ORDER;
+
+        keyBuffer.putLong(normalize(rowId.mostSignificantBits()));
+        keyBuffer.putLong(normalize(rowId.leastSignificantBits()));
+    }
+
     /**
      * Converts signed long into a new long value, that when written in Big 
Endian, will preserve the comparison order if compared
      * lexicographically as an array of unsigned bytes. For example, values 
{@code -1} and {@code 0}, when written in BE, will become
@@ -1069,7 +1264,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /**
      * Writes a timestamp into a byte buffer, in descending lexicographical 
bytes order.
      */
-    private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) {
+    private static void putTimestampDesc(ByteBuffer buf, HybridTimestamp ts) {
         assert buf.order() == KEY_BYTE_ORDER;
 
         // "bitwise negation" turns ascending order into a descending one.
@@ -1077,7 +1272,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         buf.putInt(~ts.getLogical());
     }
 
-    private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) {
+    private static HybridTimestamp readTimestampDesc(ByteBuffer keyBuf) {
         assert keyBuf.order() == KEY_BYTE_ORDER;
 
         long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
@@ -1086,6 +1281,29 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return new HybridTimestamp(physical, logical);
     }
 
+    /**
+     * Writes a timestamp into a byte buffer, in ascending lexicographical 
bytes order.
+     */
+    private static void putTimestampNatural(ByteBuffer buf, HybridTimestamp 
ts) {
+        assert buf.order() == KEY_BYTE_ORDER;
+
+        buf.putLong(ts.getPhysical());
+        buf.putInt(ts.getLogical());
+    }
+
+    private static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf) {
+        return readTimestampNatural(keyBuf, ROW_PREFIX_SIZE);
+    }
+
+    private static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf, int 
offset) {
+        assert keyBuf.order() == KEY_BYTE_ORDER;
+
+        long physical = keyBuf.getLong(offset);
+        int logical = keyBuf.getInt(offset + Long.BYTES);
+
+        return new HybridTimestamp(physical, logical);
+    }
+
     private static void putShort(byte[] array, int off, short value) {
         GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value);
     }
@@ -1361,7 +1579,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                         if (matches(rowId, key)) {
                             // This is a next version of current row.
-                            nextCommitTimestamp = readTimestamp(key);
+                            nextCommitTimestamp = readTimestampDesc(key);
                         }
                     }
                 }
@@ -1374,7 +1592,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                 if (!isWriteIntent) {
                     // There is no write-intent, return latest committed row.
-                    readResult = wrapCommittedValue(rowId, valueBytes, 
readTimestamp(currentKeyBuffer));
+                    readResult = wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(currentKeyBuffer));
                 } else {
                     readResult = wrapUncommittedValue(rowId, valueBytes, 
nextCommitTimestamp);
                 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f6ebf8c7e5..3e755c3b11 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.GC_QUEUE_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
@@ -115,6 +116,9 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Column Family handle for partition data. */
     private volatile ColumnFamily partitionCf;
 
+    /** Column Family handle for GC queue. */
+    private volatile ColumnFamily gcQueueCf;
+
     /** Column Family handle for Hash Index data. */
     private volatile ColumnFamily hashIndexCf;
 
@@ -187,6 +191,13 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return meta.columnFamily().handle();
     }
 
+    /**
+     * Returns a column family handle for GC queue.
+     */
+    public ColumnFamilyHandle gcQueueHandle() {
+        return gcQueueCf.handle();
+    }
+
     @Override
     public TableConfiguration configuration() {
         return tableCfg;
@@ -244,6 +255,11 @@ public class RocksDbTableStorage implements MvTableStorage 
{
 
                             break;
 
+                        case GC_QUEUE:
+                            gcQueueCf = cf;
+
+                            break;
+
                         case HASH_INDEX:
                             hashIndexCf = cf;
 
@@ -572,7 +588,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     /**
      * Returns a list of Column Families' names that belong to a RocksDB 
instance in the given path.
      *
-     * @return Map with column families names.
+     * @return List with column families names.
      * @throws StorageException If something went wrong.
      */
     private List<String> getExistingCfNames() {
@@ -586,7 +602,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
             // even if the database is new (no existing Column Families), we 
return the names of mandatory column families, that
             // will be created automatically.
-            return existingNames.isEmpty() ? List.of(META_CF_NAME, 
PARTITION_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
+            return existingNames.isEmpty() ? List.of(META_CF_NAME, 
PARTITION_CF_NAME, GC_QUEUE_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
         } catch (RocksDBException e) {
             throw new StorageException(
                     "Failed to read list of column families names for the 
RocksDB instance located at path " + absolutePathStr, e
@@ -612,6 +628,7 @@ public class RocksDbTableStorage implements MvTableStorage {
         switch (ColumnFamilyType.fromCfName(cfName)) {
             case META:
             case PARTITION:
+            case GC_QUEUE:
                 return new ColumnFamilyDescriptor(
                         cfName.getBytes(UTF_8),
                         new ColumnFamilyOptions()
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
new file mode 100644
index 0000000000..28b14d107e
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends 
AbstractMvPartitionStorageGcTest {
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @Override
+    protected StorageEngine createEngine() {
+        return new RocksDbStorageEngine(engineConfig, workDir);
+    }
+}


Reply via email to