This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-18020 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d1d78ae0fc00e2d407c1662df5241d68cd8cc82d Author: Semyon Danilov <[email protected]> AuthorDate: Thu Feb 2 11:51:28 2023 +0400 IGNITE-18020 add GC support to rocksdb --- gradle/libs.versions.toml | 2 +- .../AbstractMvPartitionStorageConcurrencyTest.java | 17 -- .../internal/storage/BaseMvStoragesTest.java | 46 +--- .../storage/rocksdb/ColumnFamilyUtils.java | 9 +- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 266 +++++++++++++++++++-- .../storage/rocksdb/RocksDbTableStorage.java | 21 +- .../rocksdb/RocksDbMvPartitionStorageGcTest.java | 45 ++++ 7 files changed, 327 insertions(+), 79 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index de9d67aa59..ddc93a2355 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -57,7 +57,7 @@ jsonpath = "2.4.0" classgraph = "4.8.110" javassist = "3.28.0-GA" checker = "3.10.0" -rocksdb = "7.3.1" +rocksdb = "7.9.2" disruptor = "3.3.7" metrics = "4.0.2" jctools = "3.3.0" diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java index 7e1e8d9d7c..b3a4742c4c 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java @@ -19,11 +19,9 @@ package org.apache.ignite.internal.storage; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assumptions.assumeTrue; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.TableRow; -import org.apache.ignite.internal.storage.impl.TestStorageEngine; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; @@ -85,9 +83,6 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa @ParameterizedTest @EnumSource(AddAndCommit.class) void testRegularGcAndRead(AddAndCommit addAndCommit) { - //TODO https://issues.apache.org/jira/browse/IGNITE-18020 - assumeTrue(engine instanceof TestStorageEngine); - for (int i = 0; i < REPEATS; i++) { HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW); @@ -108,9 +103,6 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa @ParameterizedTest @EnumSource(AddAndCommit.class) void testTombstoneGcAndRead(AddAndCommit addAndCommit) { - //TODO https://issues.apache.org/jira/browse/IGNITE-18020 - assumeTrue(engine instanceof TestStorageEngine); - for (int i = 0; i < REPEATS; i++) { HybridTimestamp firstCommitTs = addAndCommit.perform(this, TABLE_ROW); @@ -129,9 +121,6 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa @ParameterizedTest @EnumSource(AddAndCommit.class) void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) { - //TODO https://issues.apache.org/jira/browse/IGNITE-18020 - assumeTrue(engine instanceof TestStorageEngine); - for (int i = 0; i < REPEATS; i++) { addAndCommit.perform(this, TABLE_ROW); @@ -153,9 +142,6 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa @ParameterizedTest @EnumSource(AddAndCommit.class) void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) { - //TODO https://issues.apache.org/jira/browse/IGNITE-18020 - assumeTrue(engine instanceof TestStorageEngine); - for (int i = 0; i < REPEATS; i++) { addAndCommit.perform(this, TABLE_ROW); @@ -179,9 +165,6 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa @ParameterizedTest @EnumSource(AddAndCommit.class) void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) { - //TODO https://issues.apache.org/jira/browse/IGNITE-18020 - assumeTrue(engine instanceof TestStorageEngine); - for (int i = 0; i < REPEATS; i++) { addAndCommit.perform(this, TABLE_ROW); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java index b7e83ce61f..2ff77b10e4 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java @@ -52,58 +52,36 @@ import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; /** * Base test for MV storages, contains pojo classes, their descriptor and a marshaller instance. */ public abstract class BaseMvStoragesTest { /** Default reflection marshaller factory. */ - protected static MarshallerFactory marshallerFactory; + protected static final MarshallerFactory marshallerFactory = new ReflectionMarshallerFactory(); /** Schema descriptor for tests. */ - protected static SchemaDescriptor schemaDescriptor; + protected static final SchemaDescriptor schemaDescriptor = new SchemaDescriptor(1, new Column[]{ + new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), + new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), + }, new Column[]{ + new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), + new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), + }); /** Key-value marshaller for tests. */ - protected static KvMarshaller<TestKey, TestValue> kvMarshaller; + protected static final KvMarshaller<TestKey, TestValue> kvMarshaller + = marshallerFactory.create(schemaDescriptor, TestKey.class, TestValue.class); /** Key-value {@link BinaryTuple} converter for tests. */ - protected static BinaryConverter kvBinaryConverter; + protected static final BinaryConverter kvBinaryConverter = BinaryConverter.forRow(schemaDescriptor); /** Key {@link BinaryTuple} converter for tests. */ - protected static BinaryConverter kBinaryConverter; + protected static final BinaryConverter kBinaryConverter = BinaryConverter.forKey(schemaDescriptor); /** Hybrid clock to generate timestamps. */ protected final HybridClock clock = new HybridClockImpl(); - @BeforeAll - static void beforeAll() { - marshallerFactory = new ReflectionMarshallerFactory(); - - schemaDescriptor = new SchemaDescriptor(1, new Column[]{ - new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), - new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), - }, new Column[]{ - new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), - new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), - }); - - kvMarshaller = marshallerFactory.create(schemaDescriptor, TestKey.class, TestValue.class); - - kvBinaryConverter = BinaryConverter.forRow(schemaDescriptor); - kBinaryConverter = BinaryConverter.forKey(schemaDescriptor); - } - - @AfterAll - static void afterAll() { - kvMarshaller = null; - schemaDescriptor = null; - marshallerFactory = null; - kvBinaryConverter = null; - kBinaryConverter = null; - } - protected static TableRow tableRow(TestKey key, TestValue value) { return TableRowConverter.fromBinaryRow(binaryRow(key, value), kvBinaryConverter); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java index c49a1ebe49..eb8bb97947 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java @@ -35,6 +35,11 @@ class ColumnFamilyUtils { */ static final String PARTITION_CF_NAME = "cf-part"; + /** + * Name of the Column Family that stores garbage collection queue. + */ + static final String GC_QUEUE_CF_NAME = "cf-gc"; + /** * Name of the Column Family that stores hash index data. */ @@ -49,7 +54,7 @@ class ColumnFamilyUtils { * Utility enum to describe a type of the column family - meta or partition. */ enum ColumnFamilyType { - META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN; + META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN; /** * Determines column family type by its name. @@ -62,6 +67,8 @@ class ColumnFamilyUtils { return META; } else if (PARTITION_CF_NAME.equals(cfName)) { return PARTITION; + } else if (GC_QUEUE_CF_NAME.equals(cfName)) { + return GC_QUEUE; } else if (HASH_INDEX_CF_NAME.equals(cfName)) { return HASH_INDEX; } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) { diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 4507794523..c141b3910f 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.storage.rocksdb; import static java.lang.ThreadLocal.withInitial; +import static java.nio.ByteBuffer.allocate; import static java.nio.ByteBuffer.allocateDirect; import static java.util.Arrays.copyOf; import static java.util.Arrays.copyOfRange; @@ -53,6 +54,7 @@ import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.StorageRebalanceException; +import org.apache.ignite.internal.storage.TableRowAndRowId; import org.apache.ignite.internal.storage.TxIdMismatchException; import org.apache.ignite.internal.storage.util.StorageState; import org.apache.ignite.internal.util.ByteUtils; @@ -97,7 +99,8 @@ import org.rocksdb.WriteOptions; * * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a natural order of partitions. * - * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)} + * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). + * Please refer to {@link #putTimestampDesc(ByteBuffer, HybridTimestamp)} * to see how it's achieved. Missing timestamp could be interpreted as a moment infinitely far away in the future. */ public class RocksDbMvPartitionStorage implements MvPartitionStorage { @@ -171,6 +174,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** Meta column family. */ private final ColumnFamilyHandle meta; + /** GC queue column family. */ + private final ColumnFamilyHandle gc; + /** Write options. */ private final WriteOptions writeOpts = new WriteOptions().setDisableWAL(true); @@ -235,6 +241,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { db = tableStorage.db(); cf = tableStorage.partitionCfHandle(); meta = tableStorage.metaCfHandle(); + gc = tableStorage.gcQueueHandle(); upperBound = new Slice(partitionEndPrefix()); @@ -570,17 +577,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, uncommittedKeyBytes); if (valueBytes == null) { - //the chain doesn't contain an uncommitted write intent + // The chain doesn't contain an uncommitted write intent. return null; } + boolean isNewValueTombstone = valueBytes.length == VALUE_HEADER_SIZE; + + boolean isPreviousValueTombstone = maybeAddToGcQueue(writeBatch, rowId, timestamp, isNewValueTombstone); + // Delete pending write. writeBatch.delete(cf, uncommittedKeyBytes); - // Add timestamp to the key, and put the value back into the storage. - putTimestamp(keyBuf, timestamp); + if (!(isNewValueTombstone && isPreviousValueTombstone)) { + // Add timestamp to the key, and put the value back into the storage. + putTimestampDesc(keyBuf, timestamp); - writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length)); + writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length)); + } return null; } catch (RocksDBException e) { @@ -595,21 +608,82 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch(); ByteBuffer keyBuf = prepareHeapKeyBuf(rowId); - putTimestamp(keyBuf, commitTimestamp); + putTimestampDesc(keyBuf, commitTimestamp); + + boolean isNewValueTombstone = row == null; //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations. - byte[] rowBytes = rowBytes(row); + byte[] rowBytes = row != null ? rowBytes(row) : new byte[0]; + boolean isPreviousValueTombstone; try { - writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes); - - return null; + isPreviousValueTombstone = maybeAddToGcQueue(writeBatch, rowId, commitTimestamp, isNewValueTombstone); } catch (RocksDBException e) { - throw new StorageException("Failed to update a row in storage", e); + throw new StorageException("Failed to add row to the GC queue", e); + } + + if (!(isNewValueTombstone && isPreviousValueTombstone)) { + try { + writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes); + } catch (RocksDBException e) { + throw new StorageException("Failed to update a row in storage", e); + } } + + return null; }); } + private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, HybridTimestamp timestamp, boolean isNewValueTombstone) + throws RocksDBException { + boolean isPreviousValueTombstone = false; + + ByteBuffer key = allocateDirect(MAX_KEY_SIZE); + key.putShort((short) partitionId); + putRowId(key, rowId); + putTimestampDesc(key, timestamp); + + try ( + Slice upperBound = new Slice(partitionEndPrefix()); + ReadOptions readOpts = new ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound); + RocksIterator it = db.newIterator(cf, readOpts) + ) { + key.flip(); + it.seek(key); + + if (!invalid(it)) { + key.position(0); + int keyLen = it.key(key); + + RowId readRowId = getRowId(key); + + if (readRowId.equals(rowId)) { + assert keyLen == MAX_KEY_SIZE; // Can not be write-intent. + + if (isNewValueTombstone) { + ByteBuffer checkBuffer = allocateDirect(0); + + int valueSize = it.value(checkBuffer); + + isPreviousValueTombstone = valueSize == 0; + } + + if (!(isNewValueTombstone && isPreviousValueTombstone)) { + ByteBuffer gcKey = allocate(MAX_KEY_SIZE); + + gcKey.putShort((short) partitionId); + putTimestampNatural(gcKey, timestamp); + putRowId(gcKey, rowId); + + writeBatch.put(gc, copyOf(gcKey.array(), MAX_KEY_SIZE), new byte[0]); + } + } + } + } + + return isPreviousValueTombstone; + } + @Override public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException { return busy(() -> { @@ -682,7 +756,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { if (!isWriteIntent) { // There is no write-intent, return latest committed row. - return wrapCommittedValue(rowId, valueBytes, readTimestamp(keyBuf)); + return wrapCommittedValue(rowId, valueBytes, readTimestampDesc(keyBuf)); } return wrapUncommittedValue(rowId, valueBytes, null); @@ -700,7 +774,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { ByteBuffer keyBuf = prepareHeapKeyBuf(rowId); // Put timestamp restriction according to N2O timestamps order. - putTimestamp(keyBuf, timestamp); + putTimestampDesc(keyBuf, timestamp); // This seek will either find a key with timestamp that's less or equal than required value, or a different key whatsoever. // It is guaranteed by descending order of timestamps. @@ -779,7 +853,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { // Should not be write-intent, as we were seeking with the timestamp. assert keyLength == MAX_KEY_SIZE; - HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf); + HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf); byte[] valueBytes = seekIterator.value(); @@ -811,7 +885,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return wrapUncommittedValue(rowId, seekIterator.value(), rowTimestamp); } - return wrapCommittedValue(rowId, valueBytes, readTimestamp(foundKeyBuf)); + return wrapCommittedValue(rowId, valueBytes, readTimestampDesc(foundKeyBuf)); } } @@ -913,7 +987,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits())); if (timestamp != null) { - putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp); + putTimestampDesc(keyBuf.position(ROW_PREFIX_SIZE), timestamp); } keyBuf.position(0); @@ -973,9 +1047,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } private RowId getRowId(ByteBuffer keyBuffer) { - keyBuffer.position(ROW_ID_OFFSET); + return getRowId(keyBuffer, ROW_ID_OFFSET); + } - return new RowId(partitionId, normalize(keyBuffer.getLong()), normalize(keyBuffer.getLong())); + private RowId getRowId(ByteBuffer keyBuffer, int offset) { + return new RowId(partitionId, normalize(keyBuffer.getLong(offset)), normalize(keyBuffer.getLong(offset + Long.BYTES))); } @Override @@ -1015,6 +1091,117 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix()); } + @Override + public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) { + return busy(() -> { + ByteBuffer seekKey = MV_KEY_BUFFER.get(); + seekKey.clear(); + + try ( + WriteBatch batch = new WriteBatch(); + var gcUpperBound = new Slice(partitionEndPrefix()); + ReadOptions gcOpts = new ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true); + RocksIterator gcIt = db.newIterator(gc, gcOpts); + ) { + gcIt.seek(partitionStartPrefix()); + + if (invalid(gcIt)) { + return null; + } + + while (true) { + if (invalid(gcIt)) { + return null; + } + + seekKey.position(0); + gcIt.key(seekKey); + + HybridTimestamp timestamp = readTimestampNatural(seekKey, 2); + RowId readRowId = getRowId(seekKey, 14); + + if (timestamp.compareTo(lowWatermark) > 0) { + return null; + } + + try ( + var upperBound = new Slice(partitionEndPrefix()); + ReadOptions opts = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true); + RocksIterator it = db.newIterator(cf, opts); + ) { + ByteBuffer byteBuffer = allocateDirect(MAX_KEY_SIZE); + byteBuffer.putShort((short) partitionId); + putRowId(byteBuffer, readRowId); + putTimestampDesc(byteBuffer, timestamp); + byteBuffer.flip(); + + it.seek(byteBuffer); + + if (invalid(it)) { + return null; + } + + { + ByteBuffer checkBuffer = allocateDirect(0); + + int len = it.value(checkBuffer); + + if (len == 0) { + // This is a tombstone, we need to delete it. + byteBuffer.flip(); + + try { + batch.delete(cf, byteBuffer); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + } + it.next(); + + if (invalid(it)) { + return null; + } + + byteBuffer.flip(); + int keyLen = it.key(byteBuffer); + + if (keyLen != MAX_KEY_SIZE) { + return null; + } + + RowId gcRowId = getRowId(byteBuffer); + + if (!readRowId.equals(gcRowId)) { + return null; + } + + HybridTimestamp nextTs = readTimestampDesc(byteBuffer); + + byte[] valueBytes = it.value(); + + var row = new TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER)); + TableRowAndRowId retVal = new TableRowAndRowId(row, gcRowId); + + try { + byteBuffer.position(0); + seekKey.position(0); + batch.delete(cf, byteBuffer); + batch.delete(gc, seekKey); + db.write(writeOpts, batch); + } catch (RocksDBException e) { + // No-op. + e.printStackTrace(); + } + + return retVal; + } + + } + } + }); + } + @Override public void close() { if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) { @@ -1049,12 +1236,20 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().position(0); keyBuf.putShort((short) rowId.partitionId()); - keyBuf.putLong(normalize(rowId.mostSignificantBits())); - keyBuf.putLong(normalize(rowId.leastSignificantBits())); + + putRowId(keyBuf, rowId); return keyBuf; } + private void putRowId(ByteBuffer keyBuffer, RowId rowId) { + assert rowId.partitionId() == partitionId : rowId; + assert keyBuffer.order() == KEY_BYTE_ORDER; + + keyBuffer.putLong(normalize(rowId.mostSignificantBits())); + keyBuffer.putLong(normalize(rowId.leastSignificantBits())); + } + /** * Converts signed long into a new long value, that when written in Big Endian, will preserve the comparison order if compared * lexicographically as an array of unsigned bytes. For example, values {@code -1} and {@code 0}, when written in BE, will become @@ -1069,7 +1264,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** * Writes a timestamp into a byte buffer, in descending lexicographical bytes order. */ - private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) { + private static void putTimestampDesc(ByteBuffer buf, HybridTimestamp ts) { assert buf.order() == KEY_BYTE_ORDER; // "bitwise negation" turns ascending order into a descending one. @@ -1077,7 +1272,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { buf.putInt(~ts.getLogical()); } - private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) { + private static HybridTimestamp readTimestampDesc(ByteBuffer keyBuf) { assert keyBuf.order() == KEY_BYTE_ORDER; long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE); @@ -1086,6 +1281,29 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { return new HybridTimestamp(physical, logical); } + /** + * Writes a timestamp into a byte buffer, in ascending lexicographical bytes order. + */ + private static void putTimestampNatural(ByteBuffer buf, HybridTimestamp ts) { + assert buf.order() == KEY_BYTE_ORDER; + + buf.putLong(ts.getPhysical()); + buf.putInt(ts.getLogical()); + } + + private static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf) { + return readTimestampNatural(keyBuf, ROW_PREFIX_SIZE); + } + + private static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf, int offset) { + assert keyBuf.order() == KEY_BYTE_ORDER; + + long physical = keyBuf.getLong(offset); + int logical = keyBuf.getInt(offset + Long.BYTES); + + return new HybridTimestamp(physical, logical); + } + private static void putShort(byte[] array, int off, short value) { GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value); } @@ -1361,7 +1579,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { if (matches(rowId, key)) { // This is a next version of current row. - nextCommitTimestamp = readTimestamp(key); + nextCommitTimestamp = readTimestampDesc(key); } } } @@ -1374,7 +1592,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { if (!isWriteIntent) { // There is no write-intent, return latest committed row. - readResult = wrapCommittedValue(rowId, valueBytes, readTimestamp(currentKeyBuffer)); + readResult = wrapCommittedValue(rowId, valueBytes, readTimestampDesc(currentKeyBuffer)); } else { readResult = wrapUncommittedValue(rowId, valueBytes, nextCommitTimestamp); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java index f6ebf8c7e5..3e755c3b11 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.GC_QUEUE_CF_NAME; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME; @@ -115,6 +116,9 @@ public class RocksDbTableStorage implements MvTableStorage { /** Column Family handle for partition data. */ private volatile ColumnFamily partitionCf; + /** Column Family handle for GC queue. */ + private volatile ColumnFamily gcQueueCf; + /** Column Family handle for Hash Index data. */ private volatile ColumnFamily hashIndexCf; @@ -187,6 +191,13 @@ public class RocksDbTableStorage implements MvTableStorage { return meta.columnFamily().handle(); } + /** + * Returns a column family handle for GC queue. + */ + public ColumnFamilyHandle gcQueueHandle() { + return gcQueueCf.handle(); + } + @Override public TableConfiguration configuration() { return tableCfg; @@ -244,6 +255,11 @@ public class RocksDbTableStorage implements MvTableStorage { break; + case GC_QUEUE: + gcQueueCf = cf; + + break; + case HASH_INDEX: hashIndexCf = cf; @@ -572,7 +588,7 @@ public class RocksDbTableStorage implements MvTableStorage { /** * Returns a list of Column Families' names that belong to a RocksDB instance in the given path. * - * @return Map with column families names. + * @return List with column families names. * @throws StorageException If something went wrong. */ private List<String> getExistingCfNames() { @@ -586,7 +602,7 @@ public class RocksDbTableStorage implements MvTableStorage { // even if the database is new (no existing Column Families), we return the names of mandatory column families, that // will be created automatically. - return existingNames.isEmpty() ? List.of(META_CF_NAME, PARTITION_CF_NAME, HASH_INDEX_CF_NAME) : existingNames; + return existingNames.isEmpty() ? List.of(META_CF_NAME, PARTITION_CF_NAME, GC_QUEUE_CF_NAME, HASH_INDEX_CF_NAME) : existingNames; } catch (RocksDBException e) { throw new StorageException( "Failed to read list of column families names for the RocksDB instance located at path " + absolutePathStr, e @@ -612,6 +628,7 @@ public class RocksDbTableStorage implements MvTableStorage { switch (ColumnFamilyType.fromCfName(cfName)) { case META: case PARTITION: + case GC_QUEUE: return new ColumnFamilyDescriptor( cfName.getBytes(UTF_8), new ColumnFamilyOptions() diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java new file mode 100644 index 0000000000..28b14d107e --- /dev/null +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb; + +import java.nio.file.Path; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest; +import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test implementation for {@link RocksDbStorageEngine}. + */ +@ExtendWith(WorkDirectoryExtension.class) +public class RocksDbMvPartitionStorageGcTest extends AbstractMvPartitionStorageGcTest { + @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}") + private RocksDbStorageEngineConfiguration engineConfig; + + @WorkDirectory + private Path workDir; + + @Override + protected StorageEngine createEngine() { + return new RocksDbStorageEngine(engineConfig, workDir); + } +}
