This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 11eaf8f587 IGNITE-19395 Use indirect addressing when storing rows in
RocksDb (#4200)
11eaf8f587 is described below
commit 11eaf8f58760c4913c0ebd83d6ea1d3a435b31e2
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Aug 20 21:03:53 2024 +0300
IGNITE-19395 Use indirect addressing when storing rows in RocksDb (#4200)
---
modules/storage-rocksdb/build.gradle | 3 +
.../storage/rocksdb/ColumnFamilyUtils.java | 10 +-
.../internal/storage/rocksdb/GarbageCollector.java | 159 +++----
.../storage/rocksdb/PartitionDataHelper.java | 148 +++++--
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 490 +++++++++++----------
.../storage/rocksdb/RocksDbTableStorage.java | 7 +
.../rocksdb/instance/SharedRocksDbInstance.java | 8 +-
.../instance/SharedRocksDbInstanceCreator.java | 8 +
.../storage/rocksdb/PartitionDataHelperTest.java | 63 +++
.../benchmarks/CommitManyWritesBenchmark.java | 268 +++++++++++
.../gc/AbstractGcUpdateHandlerTest.java | 5 +-
11 files changed, 830 insertions(+), 339 deletions(-)
diff --git a/modules/storage-rocksdb/build.gradle
b/modules/storage-rocksdb/build.gradle
index 3b4803eef2..69a16af17f 100644
--- a/modules/storage-rocksdb/build.gradle
+++ b/modules/storage-rocksdb/build.gradle
@@ -37,6 +37,8 @@ dependencies {
implementation libs.auto.service.annotations
testAnnotationProcessor
project(':ignite-configuration-annotation-processor')
+ testAnnotationProcessor libs.jmh.annotation.processor
+
testImplementation project(':ignite-core')
testImplementation(testFixtures(project(':ignite-core')))
testImplementation project(':ignite-configuration')
@@ -47,6 +49,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
+ testImplementation libs.jmh.core
}
description = 'ignite-storage-rocksdb'
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 15ed6ff84d..5d8716bc95 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -42,9 +42,12 @@ public class ColumnFamilyUtils {
/** Name of the meta column family matches default columns family, meaning
that it always exist when new table is created. */
private static final String META_CF_NAME = "default";
- /** Name of the Column Family that stores partition data. */
+ /** Name of the Column Family that stores partition data with references
to row data. */
private static final String PARTITION_CF_NAME = "cf-part";
+ /** Name of the Column Family that stores row data. */
+ private static final String DATA_CF_NAME = "cf-data";
+
/** Name of the Column Family that stores garbage collection queue. */
private static final String GC_QUEUE_CF_NAME = "cf-gc";
@@ -58,6 +61,7 @@ public class ColumnFamilyUtils {
public static final List<byte[]> DEFAULT_CF_NAMES = List.of(
META_CF_NAME.getBytes(UTF_8),
PARTITION_CF_NAME.getBytes(UTF_8),
+ DATA_CF_NAME.getBytes(UTF_8),
GC_QUEUE_CF_NAME.getBytes(UTF_8),
HASH_INDEX_CF_NAME.getBytes(UTF_8)
);
@@ -70,7 +74,7 @@ public class ColumnFamilyUtils {
/** Utility enum to describe a type of the column family - meta or
partition. */
public enum ColumnFamilyType {
- META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN;
+ META, PARTITION, GC_QUEUE, DATA, HASH_INDEX, SORTED_INDEX, UNKNOWN;
/**
* Determines column family type by its name.
@@ -85,6 +89,8 @@ public class ColumnFamilyUtils {
return PARTITION;
} else if (GC_QUEUE_CF_NAME.equals(cfName)) {
return GC_QUEUE;
+ } else if (DATA_CF_NAME.equals(cfName)) {
+ return DATA;
} else if (HASH_INDEX_CF_NAME.equals(cfName)) {
return HASH_INDEX;
} else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index e9f61d268e..d6eeac6906 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.storage.rocksdb;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.ByteBuffer.allocateDirect;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_SIZE;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
-import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.getFromBatchAndDb;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.isTombstone;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.wrapIterator;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE;
@@ -73,8 +76,17 @@ class GarbageCollector {
/** Garbage collector's queue key's size. */
private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
- /** Thread-local direct buffer instance to read keys from RocksDB. */
- private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+ /** Thread-local direct buffer able to incorporate keys that correspond to
a Data ID. */
+ private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_KEY_BUFFER =
+ withInitial(() ->
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Thread-local direct buffer able to incorporate a Data ID. */
+ private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_BUFFER =
+ withInitial(() ->
allocateDirect(DATA_ID_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Thread-local direct buffer able to incorporate a key from the GC
queue. */
+ private static final ThreadLocal<ByteBuffer> DIRECT_GC_KEY_BUFFER =
+ withInitial(() ->
allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
/** Helper for the rocksdb partition. */
private final PartitionDataHelper helper;
@@ -85,14 +97,18 @@ class GarbageCollector {
/** GC queue column family. */
private final ColumnFamilyHandle gcQueueCf;
+ /** Read options for regular reads. */
+ private final ReadOptions readOpts;
+
enum AddResult {
WAS_TOMBSTONE, WAS_VALUE, WAS_EMPTY
}
- GarbageCollector(PartitionDataHelper helper, RocksDB db,
ColumnFamilyHandle gcQueueCf) {
+ GarbageCollector(PartitionDataHelper helper, RocksDB db, ReadOptions
readOpts, ColumnFamilyHandle gcQueueCf) {
this.helper = helper;
this.db = db;
this.gcQueueCf = gcQueueCf;
+ this.readOpts = readOpts;
}
/**
@@ -108,26 +124,23 @@ class GarbageCollector {
*/
AddResult tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
throws RocksDBException {
- ColumnFamilyHandle partCf = helper.partCf;
-
// Try find previous value for the row id.
- ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
- keyBuffer.clear();
+ ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
- helper.putDataKey(keyBuffer, rowId, timestamp);
+ helper.putCommittedDataIdKey(dataIdKeyBuffer, rowId, timestamp);
- try (RocksIterator it = db.newIterator(partCf,
helper.upperBoundReadOpts)) {
- it.seek(keyBuffer);
+ try (RocksIterator it = db.newIterator(helper.partCf,
helper.upperBoundReadOpts)) {
+ it.seek(dataIdKeyBuffer);
if (invalid(it)) {
return AddResult.WAS_EMPTY;
}
- keyBuffer.clear();
+ dataIdKeyBuffer.clear();
- int keyLen = it.key(keyBuffer);
+ int keyLen = it.key(dataIdKeyBuffer);
- RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+ RowId readRowId = helper.getRowId(dataIdKeyBuffer, ROW_ID_OFFSET);
if (!readRowId.equals(rowId)) {
return AddResult.WAS_EMPTY;
@@ -136,11 +149,9 @@ class GarbageCollector {
// Found previous value.
assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
- int valueSize = it.value(EMPTY_DIRECT_BUFFER);
-
AddResult result;
- if (valueSize == 0) {
+ if (isCurrentValueTombstone(it)) {
// Do not add a new tombstone if the existing value is also a
tombstone.
if (isNewValueTombstone) {
return AddResult.WAS_TOMBSTONE;
@@ -151,11 +162,11 @@ class GarbageCollector {
result = AddResult.WAS_VALUE;
}
- keyBuffer.clear();
+ ByteBuffer gcKeyBuffer = DIRECT_GC_KEY_BUFFER.get().clear();
- helper.putGcKey(keyBuffer, rowId, timestamp);
+ helper.putGcKey(gcKeyBuffer, rowId, timestamp);
- writeBatch.put(gcQueueCf, keyBuffer, EMPTY_DIRECT_BUFFER);
+ writeBatch.put(gcQueueCf, gcKeyBuffer, EMPTY_DIRECT_BUFFER);
return result;
}
@@ -164,15 +175,16 @@ class GarbageCollector {
/**
* Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#peek(HybridTimestamp)}.
*
+ * @param writeBatch Current Write Batch.
* @param lowWatermark Low watermark.
* @return Garbage collected element descriptor.
*/
- @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+ @Nullable GcEntry peek(WriteBatchWithIndex writeBatch, HybridTimestamp
lowWatermark) {
// We retrieve the first element of the GC queue and seek for it in
the data CF.
// However, the element that we need to garbage collect is the next
(older one) element.
// First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
// If the next element exists, that should be the element that we want
to garbage collect.
- try (RocksIterator gcIt = newWrappedIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ try (RocksIterator gcIt = newWrappedIterator(writeBatch, gcQueueCf,
helper.upperBoundReadOpts)) {
gcIt.seek(helper.partitionStartPrefix());
if (invalid(gcIt)) {
@@ -198,7 +210,7 @@ class GarbageCollector {
* Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#vacuum(GcEntry)}.
*
* @param batch Write batch.
- * @param entry Entry, previously returned by {@link
#peek(HybridTimestamp)}.
+ * @param entry Entry, previously returned by {@link #peek}.
* @return Garbage collected element.
* @throws RocksDBException If failed to collect the garbage.
*/
@@ -211,7 +223,7 @@ class GarbageCollector {
// However, the element that we need to garbage collect is the next
(older one) element.
// First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
// If the next element exists, that should be the element that we want
to garbage collect.
- try (RocksIterator gcIt = newWrappedIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ try (RocksIterator gcIt = newWrappedIterator(batch, gcQueueCf,
helper.upperBoundReadOpts)) {
gcIt.seek(helper.partitionStartPrefix());
if (invalid(gcIt)) {
@@ -231,7 +243,7 @@ class GarbageCollector {
// Delete element from the GC queue.
batch.delete(gcQueueCf, gcKeyBuffer);
- try (RocksIterator partIt = newWrappedIterator(partCf,
helper.upperBoundReadOpts)) {
+ try (RocksIterator partIt = newWrappedIterator(batch, partCf,
helper.upperBoundReadOpts)) {
// Process the element in data cf that triggered the addition
to the GC queue.
boolean proceed = checkHasNewerRowAndRemoveTombstone(partIt,
batch, gcRowVersion);
@@ -241,24 +253,29 @@ class GarbageCollector {
}
// Find the row that should be garbage collected.
- ByteBuffer dataKey = getRowForGcKey(partIt,
gcRowVersion.getRowId());
+ ByteBuffer dataIdKey = getDataIdKeyForGc(partIt,
gcRowVersion.getRowId());
- if (dataKey == null) {
+ if (dataIdKey == null) {
// No row for GC.
return null;
}
// At this point there's definitely a value that needs to be
garbage collected in the iterator.
- byte[] valueBytes = partIt.value();
+ ByteBuffer dataId = readDataId(partIt);
+
+ assert !isTombstone(dataId);
- assert valueBytes.length > 0; // Can't be a tombstone.
+ byte[] payloadKey = helper.createPayloadKey(dataId);
- var row = deserializeRow(ByteBuffer.wrap(valueBytes));
+ byte[] rowBytes = getFromBatchAndDb(db, batch, helper.dataCf,
readOpts, payloadKey);
+
+ assert rowBytes != null && rowBytes.length > 0;
// Delete the row from the data cf.
- batch.delete(partCf, dataKey);
+ batch.delete(partCf, dataIdKey);
+ batch.delete(helper.dataCf, payloadKey);
- return row;
+ return deserializeRow(rowBytes);
}
}
}
@@ -280,73 +297,65 @@ class GarbageCollector {
WriteBatchWithIndex batch,
GcRowVersion gcRowVersion
) throws RocksDBException {
- ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
- dataKeyBuffer.clear();
-
- ColumnFamilyHandle partCf = helper.partCf;
+ ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
// Set up the data key.
- helper.putDataKey(dataKeyBuffer, gcRowVersion.getRowId(),
gcRowVersion.getTimestamp());
+ helper.putCommittedDataIdKey(dataIdKeyBuffer, gcRowVersion.getRowId(),
gcRowVersion.getTimestamp());
// Seek to the row id and timestamp from the GC queue.
// Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
- it.seek(dataKeyBuffer);
+ it.seek(dataIdKeyBuffer);
if (invalid(it)) {
// There is no row for the GC queue element.
return false;
- } else {
- dataKeyBuffer.clear();
+ }
- it.key(dataKeyBuffer);
+ dataIdKeyBuffer.clear();
- if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
- // There is no row for the GC queue element.
- return false;
- }
+ it.key(dataIdKeyBuffer);
+
+ if (!helper.getRowId(dataIdKeyBuffer,
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
+ // There is no row for the GC queue element.
+ return false;
}
// Check if the new element, whose insertion scheduled the GC, was a
tombstone.
- int len = it.value(EMPTY_DIRECT_BUFFER);
-
- if (len == 0) {
+ if (isCurrentValueTombstone(it)) {
// This is a tombstone, we need to delete it.
- batch.delete(partCf, dataKeyBuffer);
+ batch.delete(helper.partCf, dataIdKeyBuffer);
}
return true;
}
/**
- * Checks if there is a row for garbage collection and returns this row's
key if it exists.
+ * Checks if there is a row ID for garbage collection and returns this
row's data ID key if it exists.
* There might already be no row in the data column family, because GC can
be run in parallel.
*
* @param it RocksDB data column family iterator.
- * @param gcElementRowId Row id of the element from the GC queue/
- * @return Key of the row that needs to be garbage collected, or {@code
null} if such row doesn't exist.
+ * @param gcElementRowId Row id of the element from the GC queue.
+ * @return Key for the row's data ID that needs to be garbage collected,
or {@code null} if such row doesn't exist.
*/
- private @Nullable ByteBuffer getRowForGcKey(RocksIterator it, RowId
gcElementRowId) {
+ private @Nullable ByteBuffer getDataIdKeyForGc(RocksIterator it, RowId
gcElementRowId) {
// Let's move to the element that was scheduled for GC.
it.next();
- RowId gcRowId;
-
if (invalid(it)) {
return null;
}
- ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
- dataKeyBuffer.clear();
+ ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
- int keyLen = it.key(dataKeyBuffer);
+ int keyLen = it.key(dataIdKeyBuffer);
// Check if we moved to another row id's write-intent, that would mean
that there is no row to GC for the current row id.
if (keyLen == MAX_KEY_SIZE) {
- gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+ RowId rowId = helper.getRowId(dataIdKeyBuffer, ROW_ID_OFFSET);
// We might have moved to the next row id.
- if (gcElementRowId.equals(gcRowId)) {
- return dataKeyBuffer;
+ if (gcElementRowId.equals(rowId)) {
+ return dataIdKeyBuffer;
}
}
@@ -363,9 +372,8 @@ class GarbageCollector {
writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
}
- private ByteBuffer readGcKey(RocksIterator gcIt) {
- ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
- gcKeyBuffer.clear();
+ private static ByteBuffer readGcKey(RocksIterator gcIt) {
+ ByteBuffer gcKeyBuffer = DIRECT_GC_KEY_BUFFER.get().clear();
gcIt.key(gcKeyBuffer);
@@ -379,20 +387,21 @@ class GarbageCollector {
);
}
- private void refreshGcIterator(RocksIterator gcIt, ByteBuffer gcKeyBuffer)
throws RocksDBException {
- gcIt.refresh();
-
- gcIt.seekForPrev(gcKeyBuffer);
+ private RocksIterator newWrappedIterator(WriteBatchWithIndex writeBatch,
ColumnFamilyHandle cf, ReadOptions readOptions) {
+ RocksIterator it = db.newIterator(cf, readOptions);
- // Row version was removed from the gc queue by someone, back to the
head of gc queue.
- if (invalid(gcIt)) {
- gcIt.seek(helper.partitionStartPrefix());
- }
+ return wrapIterator(it, writeBatch, cf);
}
- private RocksIterator newWrappedIterator(ColumnFamilyHandle cf,
ReadOptions readOptions) {
- RocksIterator it = db.newIterator(cf, readOptions);
+ private static ByteBuffer readDataId(RocksIterator it) {
+ ByteBuffer dataId = DIRECT_DATA_ID_BUFFER.get().clear();
+
+ it.value(dataId);
+
+ return dataId;
+ }
- return helper.wrapIterator(it, cf);
+ private static boolean isCurrentValueTombstone(RocksIterator it) {
+ return isTombstone(readDataId(it));
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index d7192f0496..194a25d583 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static java.lang.ThreadLocal.withInitial;
-import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.ByteBuffer.allocate;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
@@ -44,45 +43,50 @@ import org.apache.ignite.internal.storage.util.LockByRowId;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatchWithIndex;
/** Helper for the partition data. */
public final class PartitionDataHelper implements ManuallyCloseable {
+ /** Transaction id size (part of the transaction state). */
+ private static final int TX_ID_SIZE = 2 * Long.BYTES;
+
/** Position of row id inside the key. */
- static final int ROW_ID_OFFSET = TABLE_ID_SIZE + Short.BYTES;
+ static final int ROW_ID_OFFSET = TABLE_ID_SIZE + PARTITION_ID_SIZE;
- /** Size of the key without timestamp. */
+ /**
+ * Part of the of the key associated with transaction state without the
timestamp.
+ * Also equal to the size of a key for an uncommitted Write Intent.
+ */
public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
- /** Maximum size of the data key. */
+ /** Maximum size of the key associated with transaction state. */
static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
- /** Transaction id size (part of the transaction state). */
- private static final int TX_ID_SIZE = 2 * Long.BYTES;
-
- /** Size of the value header (transaction state). */
- static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE +
PARTITION_ID_SIZE;
-
- /** Transaction id offset. */
- static final int TX_ID_OFFSET = 0;
+ /** Size of Data ID. */
+ static final int DATA_ID_SIZE = ROW_ID_SIZE + HYBRID_TIMESTAMP_SIZE;
- /** Commit table id offset. */
- static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+ /** Offset of Data ID inside the key associated with row data. */
+ private static final int DATA_ID_OFFSET = TABLE_ID_SIZE +
PARTITION_ID_SIZE;
- /** Commit partition id offset. */
- static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+ /** Size of the key associated with row data. */
+ private static final int PAYLOAD_KEY_SIZE = DATA_ID_OFFSET + DATA_ID_SIZE;
- /** Value offset (if transaction state is present). */
- static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+ /** Size of the transaction state part of the value (Tx ID + Commit Table
ID + Commit Partition ID). */
+ private static final int TX_STATE_SIZE = TX_ID_SIZE + TABLE_ID_SIZE +
PARTITION_ID_SIZE;
- /** Thread-local direct buffer instance to read keys from RocksDB. */
- static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() ->
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+ static final int DATA_ID_WITH_TX_STATE_SIZE = DATA_ID_SIZE + TX_STATE_SIZE;
/** Thread-local write batch for {@link
MvPartitionStorage#runConsistently(WriteClosure)}. */
static final ThreadLocal<ThreadLocalState> THREAD_LOCAL_STATE = new
ThreadLocal<>();
+ /** Thread-local buffer for payload keys. */
+ private static final ThreadLocal<ByteBuffer> PAYLOAD_KEY_BUFFER =
+ ThreadLocal.withInitial(() ->
allocate(PAYLOAD_KEY_SIZE).order(KEY_BYTE_ORDER));
+
/** Table ID. */
private final int tableId;
@@ -95,6 +99,8 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
/** Partition data column family. */
final ColumnFamilyHandle partCf;
+ final ColumnFamilyHandle dataCf;
+
/** Read options for regular scans. */
final ReadOptions upperBoundReadOpts;
@@ -109,10 +115,11 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
/** Prefix for finding the ending of the partition. */
private final byte[] partitionEndPrefix;
- PartitionDataHelper(int tableId, int partitionId, ColumnFamilyHandle
partCf) {
+ PartitionDataHelper(int tableId, int partitionId, ColumnFamilyHandle
partCf, ColumnFamilyHandle dataCf) {
this.tableId = tableId;
this.partitionId = partitionId;
this.partCf = partCf;
+ this.dataCf = dataCf;
this.partitionStartPrefix = compositeKey(tableId, partitionId);
this.partitionEndPrefix = incrementPrefix(partitionStartPrefix);
@@ -140,18 +147,20 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
return partitionEndPrefix;
}
- void putDataKey(ByteBuffer dataKeyBuffer, RowId rowId, HybridTimestamp
timestamp) {
+ void putCommittedDataIdKey(ByteBuffer buffer, RowId rowId, HybridTimestamp
timestamp) {
+ assert buffer.order() == KEY_BYTE_ORDER;
assert rowId.partitionId() == partitionId : "rowPartitionId=" +
rowId.partitionId() + ", storagePartitionId=" + partitionId;
- dataKeyBuffer.putInt(tableId);
- dataKeyBuffer.putShort((short) partitionId);
- putRowIdUuid(dataKeyBuffer, rowId.uuid());
- putTimestampDesc(dataKeyBuffer, timestamp);
+ buffer.putInt(tableId);
+ buffer.putShort((short) partitionId);
+ putRowIdUuid(buffer, rowId.uuid());
+ putTimestampDesc(buffer, timestamp);
- dataKeyBuffer.flip();
+ buffer.flip();
}
void putGcKey(ByteBuffer gcKeyBuffer, RowId rowId, HybridTimestamp
timestamp) {
+ assert gcKeyBuffer.order() == KEY_BYTE_ORDER;
assert rowId.partitionId() == partitionId : "rowPartitionId=" +
rowId.partitionId() + ", storagePartitionId=" + partitionId;
gcKeyBuffer.putInt(tableId);
@@ -174,7 +183,6 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
return new RowId(partitionId, getRowIdUuid(keyBuffer, offset));
}
-
/**
* Returns a WriteBatch that can be used by the affiliated storage
implementation (like indices) to maintain consistency when run
* inside the {@link MvPartitionStorage#runConsistently} method.
@@ -200,7 +208,7 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
* Creates a byte array key, that consists of table or index ID (4 bytes),
followed by a partition ID (2 bytes).
*/
public static byte[] compositeKey(int tableOrIndexId, int partitionId) {
- return ByteBuffer.allocate(ROW_ID_OFFSET).order(KEY_BYTE_ORDER)
+ return allocate(ROW_ID_OFFSET).order(KEY_BYTE_ORDER)
.putInt(tableOrIndexId)
.putShort((short) partitionId)
.array();
@@ -241,9 +249,13 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
return hybridTimestamp(time);
}
- RocksIterator wrapIterator(RocksIterator it, ColumnFamilyHandle cf) {
- WriteBatchWithIndex writeBatch = currentWriteBatch();
+ static RocksIterator wrapIterator(RocksIterator it, ColumnFamilyHandle cf)
{
+ return wrapIterator(it, currentWriteBatch(), cf);
+ }
+ static RocksIterator wrapIterator(RocksIterator it, @Nullable
WriteBatchWithIndex writeBatch, ColumnFamilyHandle cf) {
+ // "count()" check is mandatory. Write batch iterator without any
updates just crashes everything.
+ // It's not documented, but this is exactly how it should be used.
if (writeBatch != null && writeBatch.count() > 0) {
return writeBatch.newIteratorWithBase(cf, it);
}
@@ -251,6 +263,29 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
return it;
}
+ static byte @Nullable [] getFromBatchAndDb(
+ RocksDB db, ColumnFamilyHandle cfHandle, ReadOptions readOptions,
byte[] key
+ ) throws RocksDBException {
+ return getFromBatchAndDb(db, currentWriteBatch(), cfHandle,
readOptions, key);
+ }
+
+ static byte @Nullable [] getFromBatchAndDb(
+ RocksDB db, @Nullable WriteBatchWithIndex writeBatch,
ColumnFamilyHandle cfHandle, ReadOptions readOptions, byte[] key
+ ) throws RocksDBException {
+ return writeBatch == null || writeBatch.count() == 0
+ ? db.get(cfHandle, readOptions, key)
+ : writeBatch.getFromBatchAndDB(db, cfHandle, readOptions, key);
+ }
+
+ /**
+ * Converts an internal serialized presentation of a binary row into its
Java Object counterpart.
+ */
+ static BinaryRow deserializeRow(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN);
+
+ return deserializeRow(buffer);
+ }
+
/**
* Converts an internal serialized presentation of a binary row into its
Java Object counterpart.
*/
@@ -263,6 +298,55 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
return new BinaryRowImpl(schemaVersion, binaryTupleSlice);
}
+ byte[] createPayloadKey(ByteBuffer dataId) {
+ byte[] result = PAYLOAD_KEY_BUFFER.get()
+ .clear()
+ .putInt(tableId)
+ .putShort((short) partitionId)
+ .put(dataId)
+ .array();
+
+ dataId.rewind();
+
+ // Always use 0 for the first bit (tombstone flag), because it only
makes sense when data ID is stored as a value.
+ setFirstBit(result, result.length - 1, false);
+
+ return result;
+ }
+
+ /**
+ * Changes the first bit of the byte identified by an index in an array.
+ *
+ * @param array Array containing the byte to change.
+ * @param index Index of the byte inside the array.
+ * @param value If {@code true} - sets the bit to 1, else to 0.
+ */
+ static void setFirstBit(byte[] array, int index, boolean value) {
+ if (value) {
+ array[index] |= 0x01;
+ } else {
+ array[index] &= 0xFE;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the given data ID points to a tombstone.
+ */
+ static boolean isTombstone(ByteBuffer dataId) {
+ byte lastByte = dataId.get(dataId.limit() - 1);
+
+ return (lastByte & 0x1) != 0;
+ }
+
+ /**
+ * Returns {@code true} if the given data ID points to a tombstone.
+ */
+ static boolean isTombstone(byte[] dataId) {
+ byte lastByte = dataId[dataId.length - 1];
+
+ return (lastByte & 0x1) != 0;
+ }
+
@Override
public void close() {
RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 10572189d1..15e1a33661 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -21,18 +21,21 @@ import static java.lang.ThreadLocal.withInitial;
import static java.nio.ByteBuffer.allocate;
import static java.nio.ByteBuffer.allocateDirect;
import static java.util.Arrays.copyOf;
-import static java.util.Arrays.copyOfRange;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_WITH_TX_STATE_SIZE;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
-import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.THREAD_LOCAL_STATE;
-import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE;
-import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.getFromBatchAndDb;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.isTombstone;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc;
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.requireWriteBatch;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.setFirstBit;
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.wrapIterator;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.ESTIMATED_SIZE_PREFIX;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.LEASE_PREFIX;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
@@ -45,7 +48,6 @@ import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.transitionToTerminalState;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes;
@@ -73,6 +75,7 @@ import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.storage.rocksdb.GarbageCollector.AddResult;
import org.apache.ignite.internal.storage.util.LocalLocker;
import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
@@ -86,27 +89,44 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
/**
- * Multi-versioned partition storage implementation based on RocksDB. Stored
data has the following format.
+ * Multi-versioned partition storage implementation based on RocksDB.
+ *
+ * <p>It uses two RocksDB Column Families to store partition data: one for
managing transaction state of rows and the other for
+ * storing actual row data. We call these Column Families "Partition CF" and
"Data CF" respectfully.
+ *
+ * <p>Partition Column Family has the following format:
*
* <p>Key:
* <pre>{@code
* For write-intents
- * | tableId (4 bytes, BE) | partId (2 bytes, BE) | rowId (16 bytes, BE) |
+ * | Table ID (4 bytes, BE) | Partition ID (2 bytes, BE) | Row ID (16 bytes,
BE) |
*
* For committed rows
- * | tableId (4 bytes, BE) | partId (2 bytes, BE) | rowId (16 bytes, BE) |
timestamp (8 bytes, DESC) |
+ * | Table ID (4 bytes, BE) | Partition ID (2 bytes, BE) | Row ID (16 bytes,
BE) | Commit Timestamp (8 bytes, DESC) |
* }</pre>
- * Value:
+ *
+ * <p>Value:
+ *
* <pre>{@code
* For write-intents
- * | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes)
| Row data |
+ * | Data ID (24 bytes, BE) | Commit Table ID (16 bytes) | Commit Partition ID
(2 bytes) |
*
* For committed rows
- * | Row data |
+ * | Data ID (24 bytes, BE) |
+ * }</pre>
+ *
+ * <p>Each Data ID is then used as a key inside the Data Column Family and
uniquely identifies actual row data.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | Data ID (24 bytes, BE) |
* }</pre>
*
- * <p>Pending transactions (write-intents) data doesn't have a timestamp
assigned, but they have transaction
- * state (txId, commitTableId and commitPartitionId).
+ * <p>Value:
+ *
+ * <pre>{@code
+ * | Row data |
+ * }</pre>
*
* <p>BE means Big Endian, meaning that lexicographical bytes order matches a
natural order of partitions.
*
@@ -115,10 +135,21 @@ import org.rocksdb.WriteBatchWithIndex;
* could be interpreted as a moment infinitely far away in the future.
*/
public class RocksDbMvPartitionStorage implements MvPartitionStorage {
- /** Thread-local on-heap byte buffer instance to use for key
manipulations. */
- private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER =
withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+ /** Thread-local on-heap buffer able to incorporate keys that correspond
to a committed Data ID. */
+ private static final ThreadLocal<ByteBuffer>
HEAP_COMMITTED_DATA_ID_KEY_BUFFER =
+ withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
- private static final ThreadLocal<ByteBuffer> DIRECT_KEY_BUFFER =
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+ /** Thread-local on-heap buffer able to incorporate keys that correspond
to an uncommitted Data ID. */
+ private static final ThreadLocal<ByteBuffer> HEAP_DATA_ID_KEY_BUFFER =
+ withInitial(() -> allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Thread-local direct buffer able to incorporate keys that correspond to
a Data ID. */
+ private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_KEY_BUFFER =
+ withInitial(() ->
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Thread-local on-heap buffer able to incorporate Data ID and TX state.
*/
+ private static final ThreadLocal<ByteBuffer> TX_STATE_BUFFER =
+ withInitial(() ->
allocate(DATA_ID_WITH_TX_STATE_SIZE).order(KEY_BYTE_ORDER));
/** Table storage instance. */
private final RocksDbTableStorage tableStorage;
@@ -193,8 +224,8 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
meta = tableStorage.metaCfHandle();
int tableId = tableStorage.getTableId();
- helper = new PartitionDataHelper(tableId, partitionId,
tableStorage.partitionCfHandle());
- gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle());
+ helper = new PartitionDataHelper(tableId, partitionId,
tableStorage.partitionCfHandle(), tableStorage.dataCfHandle());
+ gc = new GarbageCollector(helper, db, readOpts,
tableStorage.gcQueueHandle());
lastAppliedIndexAndTermKey = createKey(PARTITION_META_PREFIX, tableId,
partitionId);
lastGroupConfigKey = createKey(PARTITION_CONF_PREFIX, tableId,
partitionId);
@@ -330,7 +361,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
try {
-
savePendingLastApplied(PartitionDataHelper.requireWriteBatch(),
lastAppliedIndex, lastAppliedTerm);
+ savePendingLastApplied(requireWriteBatch(), lastAppliedIndex,
lastAppliedTerm);
return null;
} catch (RocksDBException e) {
@@ -381,7 +412,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
try {
-
saveGroupConfiguration(PartitionDataHelper.requireWriteBatch(), config);
+ saveGroupConfiguration(requireWriteBatch(), config);
return null;
} catch (RocksDBException e) {
@@ -405,99 +436,122 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row,
UUID txId, int commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
return busy(() -> {
- @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
assert rowIsLocked(rowId);
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId).rewind();
-
- BinaryRow res = null;
-
try {
// Check concurrent transaction data.
- byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
+ byte[] uncommittedDataIdKey =
createUncommittedDataIdKey(rowId);
- keyBuf.get(keyBytes);
+ byte[] previousTxState = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, uncommittedDataIdKey);
- keyBuf.rewind();
+ // Previous value must belong to the same transaction.
+ if (previousTxState != null) {
+ ByteBuffer previousTxStateBuffer =
ByteBuffer.wrap(previousTxState);
- byte[] previousValueBytes = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, keyBytes);
+ validateTxId(previousTxStateBuffer, txId);
- // Previous value must belong to the same transaction.
- if (previousValueBytes != null) {
- ByteBuffer previousValue =
ByteBuffer.wrap(previousValueBytes);
+ ByteBuffer dataId =
readDataIdFromTxState(previousTxStateBuffer);
- validateTxId(previousValue, txId);
+ byte[] payloadKey = helper.createPayloadKey(dataId);
- res = wrapValueIntoBinaryRow(previousValue, true);
- }
+ BinaryRow previousRow = null;
+
+ boolean isOldValueTombstone = isTombstone(dataId);
+
+ if (!isOldValueTombstone) {
+ byte[] previousRowBytes =
writeBatch.getFromBatchAndDB(db, helper.dataCf, readOpts, payloadKey);
- if (row == null) {
- ByteBuffer value = allocate(VALUE_HEADER_SIZE);
+ previousRow = deserializeRow(previousRowBytes);
+ }
+
+ // We need to flip the tombstone bit in case we are
overwriting a previous Write Intent with a different
+ // tombstone bit.
+ if (isOldValueTombstone ^ (row == null)) {
+ setFirstBit(previousTxState, DATA_ID_SIZE - 1, row ==
null);
- // Write empty value as a tombstone.
- if (previousValueBytes != null) {
- // Reuse old array with transaction id already written
to it.
- value.put(previousValueBytes, 0, VALUE_HEADER_SIZE);
- } else {
- writeHeader(value, txId, commitTableId,
commitPartitionId);
+ writeBatch.put(helper.partCf, uncommittedDataIdKey,
previousTxState);
}
- writeBatch.put(helper.partCf, keyBytes, value.array());
+ // No need to update the Data ID key because it should be
the same as already in the storage.
+ if (row != null) {
+ writeBatch.put(helper.dataCf, payloadKey,
serializeBinaryRow(row));
+ }
+
+ return previousRow;
} else {
- writeUnversioned(keyBytes, row, txId, commitTableId,
commitPartitionId);
+ ByteBuffer txState = createTxState(rowId, txId,
commitTableId, commitPartitionId, row == null);
+
+ ByteBuffer dataId = readDataIdFromTxState(txState);
+
+ writeBatch.put(helper.partCf, uncommittedDataIdKey,
txState.array());
+
+ if (row != null) {
+ writeBatch.put(helper.dataCf,
helper.createPayloadKey(dataId), serializeBinaryRow(row));
+ }
+
+ return null;
}
} catch (RocksDBException e) {
throw new StorageException("Failed to update a row in storage:
" + createStorageInfo(), e);
}
-
- return res;
});
}
- /**
- * Writes a tuple of transaction id and a row bytes, using "row prefix" of
a key array as a storage key.
- *
- * @param keyArray Array that has partition id and row id in its prefix.
- * @param row Binary row, not null.
- * @param txId Transaction id.
- * @throws RocksDBException If write failed.
- */
- private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId,
int commitTableId, int commitPartitionId)
- throws RocksDBException {
- @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
-
- ByteBuffer value = allocate(rowSize(row) + VALUE_HEADER_SIZE);
-
- writeHeader(value, txId, commitTableId, commitPartitionId);
+ private static ByteBuffer createDataId(RowId rowId, HybridTimestamp
txTimestamp, boolean isTombstone) {
+ ByteBuffer buffer = allocate(DATA_ID_SIZE).order(KEY_BYTE_ORDER);
- writeBinaryRow(value, row);
+ putDataId(buffer, rowId, txTimestamp, isTombstone);
- // Write table row data as a value.
- writeBatch.put(helper.partCf, keyArray, value.array());
+ return buffer.rewind();
}
- private static int rowSize(BinaryRow row) {
- // Tuple + schema version.
- return row.tupleSliceLength() + Short.BYTES;
- }
+ private static ByteBuffer createTxState(RowId rowId, UUID txId, int
commitTableId, int commitPartitionId, boolean isTombstone) {
+ ByteBuffer buffer = TX_STATE_BUFFER.get().clear();
- private static void writeHeader(ByteBuffer dest, UUID txId, int
commitTableId, int commitPartitionId) {
- assert dest.order() == ByteOrder.BIG_ENDIAN;
+ putDataId(buffer, rowId, TransactionIds.beginTimestamp(txId),
isTombstone);
- dest
+ return buffer
.putLong(txId.getMostSignificantBits())
.putLong(txId.getLeastSignificantBits())
.putInt(commitTableId)
- .putShort((short) commitPartitionId);
+ .putShort((short) commitPartitionId)
+ .rewind();
}
- private static void writeBinaryRow(ByteBuffer dest, BinaryRow row) {
- assert dest.order() == ByteOrder.BIG_ENDIAN;
+ private static void putDataId(ByteBuffer buffer, RowId rowId,
HybridTimestamp txTimestamp, boolean isTombstone) {
+ long timestamp = txTimestamp.longValue();
+
+ // We use the sign bit from the timestamp (which is always zero,
because timestamp is always positive) to indicate whether
+ // Data ID points to a tombstone. This can help to avoid indirect
reads for tombstones.
+ long timestampWithTombstoneFlag = timestamp << 1 | (isTombstone ? 1 :
0);
+
+ buffer
+ .putLong(rowId.mostSignificantBits())
+ .putLong(rowId.leastSignificantBits())
+ .putLong(timestampWithTombstoneFlag);
+ }
+
+ private static ByteBuffer readDataIdFromTxState(ByteBuffer txState) {
+ int prevLimit = txState.limit();
+
+ ByteBuffer dataId = txState
+ .limit(DATA_ID_SIZE)
+ .slice()
+ .order(KEY_BYTE_ORDER);
+
+ txState.position(txState.position() + DATA_ID_SIZE).limit(prevLimit);
- dest
+ return dataId;
+ }
+
+ private static byte[] serializeBinaryRow(BinaryRow row) {
+ return allocate(Short.BYTES + row.tupleSliceLength())
+ .order(KEY_BYTE_ORDER)
.putShort((short) row.schemaVersion())
- .put(row.tupleSlice());
+ .put(row.tupleSlice())
+ .array();
}
@Override
@@ -505,32 +559,38 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
assert rowIsLocked(rowId);
- ByteBuffer keyBuf = prepareDirectKeyBuf(rowId)
- .position(0)
- .limit(ROW_PREFIX_SIZE);
+ byte[] uncommittedDataIdKey = createUncommittedDataIdKey(rowId);
try {
- byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
+ byte[] dataIdWithTxState = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, uncommittedDataIdKey);
- keyBuf.get(keyBytes);
+ if (dataIdWithTxState == null) {
+ // The chain doesn't contain an uncommitted write intent.
+ return null;
+ }
- keyBuf.rewind();
+ ByteBuffer dataId =
readDataIdFromTxState(ByteBuffer.wrap(dataIdWithTxState));
- byte[] previousValue = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, keyBytes);
+ byte[] payloadKey = helper.createPayloadKey(dataId);
- if (previousValue == null) {
- // The chain doesn't contain an uncommitted write intent.
- return null;
+ BinaryRow row = null;
+
+ if (!isTombstone(dataId)) {
+ byte[] rowBytes = writeBatch.getFromBatchAndDB(db,
helper.dataCf, readOpts, payloadKey);
+
+ row = deserializeRow(rowBytes);
}
// Perform unconditional remove for the key without associated
timestamp.
- writeBatch.delete(helper.partCf, keyBuf);
+ writeBatch.delete(helper.partCf, uncommittedDataIdKey);
+
+ writeBatch.delete(helper.dataCf, payloadKey);
- return wrapValueIntoBinaryRow(ByteBuffer.wrap(previousValue),
true);
+ return row;
} catch (RocksDBException e) {
throw new StorageException("Failed to roll back
insert/update", e);
}
@@ -546,29 +606,31 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
@Override
public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
busy(() -> {
- WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
assert rowIsLocked(rowId);
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ byte[] dataIdKey = createCommittedDataIdKey(rowId, timestamp);
+
+ byte[] uncommittedDataIdKey = copyOf(dataIdKey, ROW_PREFIX_SIZE);
try {
// Read a value associated with pending write.
- byte[] uncommittedKeyBytes = copyOf(keyBuf.array(),
ROW_PREFIX_SIZE);
-
- byte[] valueBytes = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, uncommittedKeyBytes);
+ byte[] txState = writeBatch.getFromBatchAndDB(db,
helper.partCf, readOpts, uncommittedDataIdKey);
- if (valueBytes == null) {
+ if (txState == null) {
// The chain doesn't contain an uncommitted write intent.
return null;
}
- boolean isNewValueTombstone = valueBytes.length ==
VALUE_HEADER_SIZE;
+ byte[] dataId = copyOf(txState, DATA_ID_SIZE);
+
+ boolean isNewValueTombstone = isTombstone(dataId);
AddResult addResult = gc.tryAddToGcQueue(writeBatch, rowId,
timestamp, isNewValueTombstone);
// Delete pending write.
- writeBatch.delete(helper.partCf, uncommittedKeyBytes);
+ writeBatch.delete(helper.partCf, uncommittedDataIdKey);
// We only write tombstone if the previous value for the same
row id was not a tombstone.
// So there won't be consecutive tombstones for the same row
id.
@@ -577,13 +639,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
// Add timestamp to the key, and put the value back into the
storage.
- putTimestampDesc(keyBuf, timestamp);
-
- writeBatch.put(
- helper.partCf,
- copyOf(keyBuf.array(), MAX_KEY_SIZE),
- copyOfRange(valueBytes, VALUE_HEADER_SIZE,
valueBytes.length)
- );
+ writeBatch.put(helper.partCf, dataIdKey, dataId);
updateEstimatedSize(isNewValueTombstone, addResult);
@@ -597,7 +653,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
@Override
public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) throws StorageException {
busy(() -> {
- WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
assert rowIsLocked(rowId);
@@ -612,25 +668,17 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return null;
}
- // TODO IGNITE-16913 Add proper way to write row bytes into
array without allocations.
- byte[] rowBytes;
+ byte[] dataIdKey = createCommittedDataIdKey(rowId,
commitTimestamp);
- if (row == null) {
- rowBytes = BYTE_EMPTY_ARRAY;
- } else {
- ByteBuffer rowBuffer = allocate(rowSize(row));
+ ByteBuffer dataId = createDataId(rowId, commitTimestamp,
isNewValueTombstone);
- writeBinaryRow(rowBuffer, row);
+ writeBatch.put(helper.partCf, dataIdKey, dataId.array());
- rowBytes = rowBuffer.array();
+ // TODO IGNITE-16913 Add proper way to write row bytes into
array without allocations.
+ if (row != null) {
+ writeBatch.put(helper.dataCf,
helper.createPayloadKey(dataId), serializeBinaryRow(row));
}
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
- putTimestampDesc(keyBuf, commitTimestamp);
-
- writeBatch.put(helper.partCf, keyBuf.array(), rowBytes);
-
updateEstimatedSize(isNewValueTombstone, addResult);
return null;
@@ -666,17 +714,10 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
String.format("RowId partition [%d] is not equal to
storage partition [%d].", rowId.partitionId(), partitionId));
}
- // We can read data outside of consistency closure. Batch is not
required.
- WriteBatchWithIndex writeBatch = helper.currentWriteBatch();
-
try (
// Set next partition as an upper bound.
RocksIterator baseIterator = db.newIterator(helper.partCf,
helper.upperBoundReadOpts);
- // "count()" check is mandatory. Write batch iterator
without any updates just crashes everything.
- // It's not documented, but this is exactly how it should
be used.
- RocksIterator seekIterator = writeBatch != null &&
writeBatch.count() > 0
- ? writeBatch.newIteratorWithBase(helper.partCf,
baseIterator)
- : baseIterator
+ RocksIterator seekIterator = wrapIterator(baseIterator,
helper.partCf)
) {
if (lookingForLatestVersions(timestamp)) {
return readLatestVersion(rowId, seekIterator);
@@ -692,26 +733,26 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
private ReadResult readLatestVersion(RowId rowId, RocksIterator
seekIterator) {
- ByteBuffer keyBuf = prepareDirectKeyBuf(rowId);
+ ByteBuffer dataIdKeyPrefix = prepareDirectDataIdKeyBuf(rowId)
+ .position(0)
+ .limit(ROW_PREFIX_SIZE);
// Seek to the first appearance of row id if timestamp isn't set.
// Since timestamps are sorted from newest to oldest, first occurrence
will always be the latest version.
- assert keyBuf.position() == ROW_PREFIX_SIZE;
-
-
seekIterator.seek(keyBuf.duplicate().position(0).limit(ROW_PREFIX_SIZE));
+ seekIterator.seek(dataIdKeyPrefix);
if (invalid(seekIterator)) {
// No data at all.
return ReadResult.empty(rowId);
}
- ByteBuffer readKeyBuf = DIRECT_KEY_BUFFER.get().clear();
+ ByteBuffer dataIdKey = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
- int keyLength = seekIterator.key(readKeyBuf);
+ int keyLength = seekIterator.key(dataIdKey);
- readKeyBuf.position(0).limit(keyLength);
+ dataIdKey.position(0).limit(keyLength);
- if (!matches(rowId, readKeyBuf)) {
+ if (!matches(rowId, dataIdKey)) {
// It is already a different row, so no version exists for our
rowId.
return ReadResult.empty(rowId);
}
@@ -720,15 +761,19 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value());
- return readResultFromKeyAndValue(isWriteIntent, readKeyBuf,
valueBytes);
+ return readResultFromKeyAndValue(isWriteIntent, dataIdKey, valueBytes);
}
- private ReadResult readResultFromKeyAndValue(boolean isWriteIntent,
ByteBuffer keyBuf, ByteBuffer valueBytes) {
- RowId rowId = getRowId(keyBuf);
+ private ReadResult readResultFromKeyAndValue(
+ boolean isWriteIntent,
+ ByteBuffer dataIdKey,
+ ByteBuffer valueBytes
+ ) {
+ RowId rowId = getRowId(dataIdKey);
if (!isWriteIntent) {
// There is no write-intent, return latest committed row.
- return wrapCommittedValue(rowId, valueBytes,
readTimestampDesc(keyBuf));
+ return wrapCommittedValue(rowId, valueBytes,
readTimestampDesc(dataIdKey));
}
return wrapUncommittedValue(rowId, valueBytes, null);
@@ -743,16 +788,13 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* @return Read result.
*/
private ReadResult readByTimestamp(RocksIterator seekIterator, RowId
rowId, HybridTimestamp timestamp) {
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
- // Put timestamp restriction according to N2O timestamps order.
- putTimestampDesc(keyBuf, timestamp);
+ byte[] committedDataIdKey = createCommittedDataIdKey(rowId, timestamp);
// This seek will either find a key with timestamp that's less or
equal than required value, or a different key whatsoever.
// It is guaranteed by descending order of timestamps.
- seekIterator.seek(keyBuf.array());
+ seekIterator.seek(committedDataIdKey);
- return handleReadByTimestampIterator(seekIterator, rowId, timestamp,
keyBuf);
+ return handleReadByTimestampIterator(seekIterator, rowId, timestamp,
committedDataIdKey);
}
/**
@@ -763,15 +805,19 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* @param seekIterator Iterator, on which seek operation was already
performed.
* @param rowId Row id.
* @param timestamp Timestamp.
- * @param keyBuf Buffer with a key in it: partition id + row id +
timestamp.
+ * @param committedDataIdKey Key for a committed entry: partition id + row
id + timestamp.
* @return Read result.
*/
- private static ReadResult handleReadByTimestampIterator(RocksIterator
seekIterator, RowId rowId, HybridTimestamp timestamp,
- ByteBuffer keyBuf) {
+ private ReadResult handleReadByTimestampIterator(
+ RocksIterator seekIterator,
+ RowId rowId,
+ HybridTimestamp timestamp,
+ byte[] committedDataIdKey
+ ) {
// There's no guarantee that required key even exists. If it doesn't,
then "seek" will point to a different key.
// To avoid returning its value, we have to check that actual key
matches what we need.
// Here we prepare direct buffer to read key without timestamp. Shared
direct buffer is used to avoid extra memory allocations.
- ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().clear();
+ ByteBuffer foundKeyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
int keyLength = 0;
@@ -783,7 +829,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// There is no record older than timestamp.
// There might be a write-intent which we should return.
// Seek to *just* row id.
- seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ seekIterator.seek(copyOf(committedDataIdKey, ROW_PREFIX_SIZE));
if (invalid(seekIterator)) {
// There are no writes with row id.
@@ -866,14 +912,15 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* Checks if row id matches the one written in the key buffer. Note: this
operation changes the position in the buffer.
*
* @param rowId Row id.
- * @param keyBuf Key buffer.
+ * @param dataIdKey Key buffer.
* @return {@code true} if row id matches the key buffer, {@code false}
otherwise.
*/
- private static boolean matches(RowId rowId, ByteBuffer keyBuf) {
+ private static boolean matches(RowId rowId, ByteBuffer dataIdKey) {
// Comparison starts from the position of the row id.
- keyBuf.position(ROW_ID_OFFSET);
+ dataIdKey.position(ROW_ID_OFFSET);
- return rowId.mostSignificantBits() == normalize(keyBuf.getLong()) &&
rowId.leastSignificantBits() == normalize(keyBuf.getLong());
+ return rowId.mostSignificantBits() == normalize(dataIdKey.getLong())
+ && rowId.leastSignificantBits() ==
normalize(dataIdKey.getLong());
}
@Override
@@ -883,7 +930,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- ByteBuffer prefix = prepareDirectKeyBuf(rowId)
+ ByteBuffer prefix = prepareDirectDataIdKeyBuf(rowId)
.position(0)
.limit(ROW_PREFIX_SIZE);
@@ -891,7 +938,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
RocksIterator it = db.newIterator(helper.partCf, options);
- it = helper.wrapIterator(it, helper.partCf);
+ it = wrapIterator(it, helper.partCf);
it.seek(prefix);
@@ -970,7 +1017,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- ByteBuffer keyBuf = prepareDirectKeyBuf(lowerBound)
+ ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
.position(0)
.limit(ROW_PREFIX_SIZE);
@@ -1029,7 +1076,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return null;
}
- AbstractWriteBatch writeBatch =
PartitionDataHelper.requireWriteBatch();
+ AbstractWriteBatch writeBatch = requireWriteBatch();
try {
byte[] leaseBytes = new byte[Long.BYTES];
@@ -1061,24 +1108,24 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
writeBatch.delete(meta, leaseKey);
writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
+ writeBatch.deleteRange(helper.dataCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
gc.deleteQueue(writeBatch);
}
@Override
public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- //noinspection resource
- PartitionDataHelper.requireWriteBatch();
+ WriteBatchWithIndex batch = requireWriteBatch();
// No busy lock required, we're already in "runConsistently" closure.
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- return gc.peek(lowWatermark);
+ return gc.peek(batch, lowWatermark);
}
@Override
public @Nullable BinaryRow vacuum(GcEntry entry) {
- WriteBatchWithIndex batch = PartitionDataHelper.requireWriteBatch();
+ WriteBatchWithIndex batch = requireWriteBatch();
// No busy lock required, we're already in "runConsistently" closure.
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
@@ -1118,26 +1165,31 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
transitionToDestroyedOrClosedState(StorageState.DESTROYED);
}
- /**
- * Prepares thread-local on-heap byte buffer. Writes row id in it.
Partition id is already there. Timestamp is not cleared.
- */
- private ByteBuffer prepareHeapKeyBuf(RowId rowId) {
- ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().clear();
+ private byte[] createUncommittedDataIdKey(RowId rowId) {
+ ByteBuffer uncommittedDataIdKeyBuf =
HEAP_DATA_ID_KEY_BUFFER.get().clear();
- writeKey(keyBuf, rowId);
+ writeRowPrefix(uncommittedDataIdKeyBuf, rowId);
- return keyBuf;
+ return uncommittedDataIdKeyBuf.array();
+ }
+
+ private byte[] createCommittedDataIdKey(RowId rowId, HybridTimestamp
timestamp) {
+ ByteBuffer keyBuf = HEAP_COMMITTED_DATA_ID_KEY_BUFFER.get().clear();
+
+ helper.putCommittedDataIdKey(keyBuf, rowId, timestamp);
+
+ return keyBuf.array();
}
- private ByteBuffer prepareDirectKeyBuf(RowId rowId) {
- ByteBuffer keyBuf = DIRECT_KEY_BUFFER.get().clear();
+ private ByteBuffer prepareDirectDataIdKeyBuf(RowId rowId) {
+ ByteBuffer keyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
- writeKey(keyBuf, rowId);
+ writeRowPrefix(keyBuf, rowId);
return keyBuf;
}
- private void writeKey(ByteBuffer buffer, RowId rowId) {
+ private void writeRowPrefix(ByteBuffer buffer, RowId rowId) {
assert buffer.order() == KEY_BYTE_ORDER;
assert rowId.partitionId() == partitionId : rowId;
@@ -1147,11 +1199,13 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
helper.putRowId(buffer, rowId);
}
- private static void validateTxId(ByteBuffer valueBytes, UUID txId) {
- long msb = valueBytes.getLong();
- long lsb = valueBytes.getLong();
+ private static void validateTxId(ByteBuffer dataIdWithTxState, UUID txId) {
+ dataIdWithTxState.position(DATA_ID_SIZE);
+
+ long msb = dataIdWithTxState.getLong();
+ long lsb = dataIdWithTxState.getLong();
- valueBytes.rewind();
+ dataIdWithTxState.rewind();
if (txId.getMostSignificantBits() != msb ||
txId.getLeastSignificantBits() != lsb) {
throw new TxIdMismatchException(txId, new UUID(msb, lsb));
@@ -1177,48 +1231,29 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return invalid;
}
- /**
- * Converts raw byte array representation of the value into a binary row.
- *
- * @param valueBytes Value bytes as read from the storage.
- * @param valueHasTxData Whether the value has a transaction id prefix in
it.
- * @return Binary row instance or {@code null} if value is a tombstone.
- */
- private static @Nullable BinaryRow wrapValueIntoBinaryRow(ByteBuffer
valueBytes, boolean valueHasTxData) {
- if (isTombstone(valueBytes, valueHasTxData)) {
- return null;
- }
-
- assert valueBytes.order() == ByteOrder.BIG_ENDIAN;
-
- if (valueHasTxData) {
- valueBytes.position(VALUE_OFFSET);
- }
-
- return deserializeRow(valueBytes);
- }
-
/**
* Converts raw byte array representation of the write-intent value into a
read result adding newest commit timestamp if it is not
* {@code null}.
*
* @param rowId ID of the corresponding row.
- * @param valueBuffer Value bytes as read from the storage.
+ * @param transactionState Transaction state, including the Data ID.
* @param newestCommitTs Commit timestamp of the most recent committed
write of this value.
* @return Read result instance.
*/
- private static ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer
valueBuffer, @Nullable HybridTimestamp newestCommitTs) {
- assert valueBuffer.order() == ByteOrder.BIG_ENDIAN;
+ private ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer
transactionState, @Nullable HybridTimestamp newestCommitTs) {
+ assert transactionState.order() == KEY_BYTE_ORDER;
- UUID txId = new UUID(valueBuffer.getLong(), valueBuffer.getLong());
+ ByteBuffer dataId = readDataIdFromTxState(transactionState);
- int commitTableId = valueBuffer.getInt();
+ UUID txId = new UUID(transactionState.getLong(),
transactionState.getLong());
- int commitPartitionId = Short.toUnsignedInt(valueBuffer.getShort());
+ int commitTableId = transactionState.getInt();
- BinaryRow row = valueBuffer.remaining() == 0 ? null :
deserializeRow(valueBuffer);
+ int commitPartitionId =
Short.toUnsignedInt(transactionState.getShort());
- valueBuffer.rewind();
+ transactionState.rewind();
+
+ BinaryRow row = readRowByDataId(dataId);
return ReadResult.createFromWriteIntent(rowId, row, txId,
commitTableId, commitPartitionId, newestCommitTs);
}
@@ -1227,27 +1262,29 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
* Converts raw byte array representation of the value into a read result.
*
* @param rowId ID of the corresponding row.
- * @param valueBytes Value bytes as read from the storage.
+ * @param dataId Data ID pointing to row data.
* @param rowCommitTimestamp Timestamp with which the row was committed.
* @return Read result instance or {@code null} if value is a tombstone.
*/
- private static ReadResult wrapCommittedValue(RowId rowId, ByteBuffer
valueBytes, HybridTimestamp rowCommitTimestamp) {
- if (valueBytes.remaining() == 0) {
- return ReadResult.empty(rowId);
+ private ReadResult wrapCommittedValue(RowId rowId, ByteBuffer dataId,
HybridTimestamp rowCommitTimestamp) {
+ return ReadResult.createFromCommitted(rowId, readRowByDataId(dataId),
rowCommitTimestamp);
+ }
+
+ @Nullable
+ private BinaryRow readRowByDataId(ByteBuffer dataId) {
+ if (isTombstone(dataId)) {
+ return null;
}
- return ReadResult.createFromCommitted(
- rowId,
- deserializeRow(valueBytes),
- rowCommitTimestamp
- );
- }
+ try {
+ byte[] payloadKey = helper.createPayloadKey(dataId);
- /**
- * Returns {@code true} if value payload represents a tombstone.
- */
- private static boolean isTombstone(ByteBuffer valueBytes, boolean hasTxId)
{
- return valueBytes.limit() == (hasTxId ? VALUE_HEADER_SIZE : 0);
+ byte[] rowBytes = getFromBatchAndDb(db, helper.dataCf, readOpts,
payloadKey);
+
+ return rowBytes == null ? null : deserializeRow(rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
}
private abstract class BasePartitionTimestampCursor implements
PartitionTimestampCursor {
@@ -1293,7 +1330,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
it.seek(seekKeyBuf.array());
- ReadResult readResult = handleReadByTimestampIterator(it,
currentRowId, timestamp, seekKeyBuf);
+ ReadResult readResult = handleReadByTimestampIterator(it,
currentRowId, timestamp, seekKeyBuf.array());
if (readResult.isEmpty()) {
return null;
@@ -1343,7 +1380,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
currentRowId = null;
// Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer currentKeyBuffer = DIRECT_KEY_BUFFER.get();
+ ByteBuffer currentKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get();
while (true) {
// At this point, seekKeyBuf should contain row id that's
above the one we already scanned, but not greater than any
@@ -1455,7 +1492,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
currentRowId = null;
// Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer directBuffer = DIRECT_KEY_BUFFER.get();
+ ByteBuffer directBuffer = DIRECT_DATA_ID_KEY_BUFFER.get();
while (true) {
// TODO IGNITE-18201 Remove copying.
@@ -1475,7 +1512,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
// Seek to current row id + timestamp.
it.seek(seekKeyBuf.array());
- ReadResult readResult = handleReadByTimestampIterator(it,
rowId, timestamp, seekKeyBuf);
+ ReadResult readResult = handleReadByTimestampIterator(it,
rowId, timestamp, seekKeyBuf.array());
if (readResult.isEmpty() && !readResult.isWriteIntent()) {
// Seek to next row id as we found nothing that matches.
@@ -1580,6 +1617,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
writeBatch.delete(meta, estimatedSizeKey);
writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
+ writeBatch.deleteRange(helper.dataCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
gc.deleteQueue(writeBatch);
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 65c368dbbd..53e5d65163 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -136,6 +136,13 @@ public class RocksDbTableStorage implements MvTableStorage
{
return rocksDb.gcQueueCf.handle();
}
+ /**
+ * Returns a column family handle for data CF.
+ */
+ ColumnFamilyHandle dataCfHandle() {
+ return rocksDb.dataCf.handle();
+ }
+
/**
* Returns a future to wait next flush operation from the current point in
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
* achieve this.
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 4771a339a7..35f65660e6 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -113,12 +113,15 @@ public final class SharedRocksDbInstance {
/** Meta information instance that wraps {@link ColumnFamily} instance for
meta column family. */
public final RocksDbMetaStorage meta;
- /** Column Family for partition data. */
+ /** Column Family for TX data and references to {@link #dataCf}. */
public final ColumnFamily partitionCf;
/** Column Family for GC queue. */
public final ColumnFamily gcQueueCf;
+ /** Column Family for storing binary rows. */
+ public final ColumnFamily dataCf;
+
/** Column Family for Hash Index data. */
private final ColumnFamily hashIndexCf;
@@ -143,6 +146,7 @@ public final class SharedRocksDbInstance {
RocksDbMetaStorage meta,
ColumnFamily partitionCf,
ColumnFamily gcQueueCf,
+ ColumnFamily dataCf,
ColumnFamily hashIndexCf,
List<ColumnFamily> sortedIndexCfs,
List<AutoCloseable> resources
@@ -157,6 +161,7 @@ public final class SharedRocksDbInstance {
this.meta = meta;
this.partitionCf = partitionCf;
this.gcQueueCf = gcQueueCf;
+ this.dataCf = dataCf;
this.hashIndexCf = hashIndexCf;
this.resources = new ArrayList<>(resources);
@@ -363,6 +368,7 @@ public final class SharedRocksDbInstance {
.array();
deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
+ deleteByPrefix(writeBatch, dataCf, tableIdBytes);
deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
index 29b898d814..58b46138a9 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
@@ -100,6 +100,7 @@ public class SharedRocksDbInstanceCreator {
RocksDbMetaStorage meta = null;
ColumnFamily partitionCf = null;
ColumnFamily gcQueueCf = null;
+ ColumnFamily dataCf = null;
ColumnFamily hashIndexCf = null;
var sortedIndexCfs = new ArrayList<ColumnFamily>();
@@ -123,6 +124,11 @@ public class SharedRocksDbInstanceCreator {
break;
+ case DATA:
+ dataCf = cf;
+
+ break;
+
case HASH_INDEX:
hashIndexCf = cf;
@@ -150,6 +156,7 @@ public class SharedRocksDbInstanceCreator {
requireNonNull(partitionCf, "partitionCf"),
requireNonNull(gcQueueCf, "gcQueueCf"),
requireNonNull(hashIndexCf, "hashIndexCf"),
+ requireNonNull(dataCf, "dataCf"),
sortedIndexCfs,
resources // Trusts the inner class to copy the resources!!
);
@@ -198,6 +205,7 @@ public class SharedRocksDbInstanceCreator {
switch (ColumnFamilyType.fromCfName(utf8cfName)) {
case META:
case GC_QUEUE:
+ case DATA:
return add(new ColumnFamilyOptions());
case PARTITION:
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
new file mode 100644
index 0000000000..e8540c26d3
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.setFirstBit;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.junit.jupiter.api.Test;
+
+class PartitionDataHelperTest {
+ @Test
+ void testSetFirstBit() {
+ byte[] array = {1, 0, 42};
+
+ setFirstBit(array, 1, true);
+
+ assertThat(array, is(new byte[] {1, 1, 42}));
+
+ setFirstBit(array, 1, true);
+
+ assertThat(array, is(new byte[] {1, 1, 42}));
+
+ setFirstBit(array, 1, false);
+
+ assertThat(array, is(new byte[] {1, 0, 42}));
+
+ setFirstBit(array, 1, false);
+
+ assertThat(array, is(new byte[] {1, 0, 42}));
+
+ setFirstBit(array, 2, true);
+
+ assertThat(array, is(new byte[] {1, 0, 43}));
+
+ setFirstBit(array, 2, true);
+
+ assertThat(array, is(new byte[] {1, 0, 43}));
+
+ setFirstBit(array, 2, false);
+
+ assertThat(array, is(new byte[] {1, 0, 42}));
+
+ setFirstBit(array, 2, false);
+
+ assertThat(array, is(new byte[] {1, 0, 42}));
+ }
+}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
new file mode 100644
index 0000000000..b6dc18dfbd
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.benchmarks;
+
+import static org.apache.ignite.internal.util.IgniteUtils.capacity;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbTableStorage;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbProfileView;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark for measuring the performance of {@link
RocksDbMvPartitionStorage} in a scenario, when many entries are added in one big
+ * transaction and are then committed.
+ */
+@Fork(1)
+@Threads(5)
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.SingleShotTime)
+public class CommitManyWritesBenchmark {
+ private static final String STORAGE_PROFILE_NAME = "test";
+ private static final int TABLE_ID = 1;
+ private static final int NUM_PARTITIONS = 50;
+ private static final int NUM_ROWS = 100_000;
+ private static final int ROW_LENGTH = 10_000;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ private RocksDbStorageEngine storageEngine;
+
+ private RocksDbTableStorage tableStorage;
+
+ /** Setup method. */
+ @Setup
+ public void setUp() throws IOException {
+ Path workDir =
Files.createTempDirectory(CommitManyWritesBenchmark.class.getSimpleName());
+
+ storageEngine = new RocksDbStorageEngine(
+ "test",
+ engineConfiguration(),
+ storageConfiguration(),
+ workDir,
+ () -> {}
+ );
+
+ storageEngine.start();
+
+ var tableDescriptor = new StorageTableDescriptor(TABLE_ID,
NUM_PARTITIONS, STORAGE_PROFILE_NAME);
+
+ tableStorage = storageEngine.createMvTable(tableDescriptor, indexId ->
null);
+
+ CompletableFuture<?>[] createFutures = IntStream.range(0,
NUM_PARTITIONS)
+ .mapToObj(tableStorage::createMvPartition)
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFuture.allOf(createFutures).join();
+ }
+
+ /** Tear down method. */
+ @TearDown
+ public void tearDown() {
+ tableStorage.destroy().join();
+
+ storageEngine.stop();
+ }
+
+ private static int randomPartitionId() {
+ return ThreadLocalRandom.current().nextInt(NUM_PARTITIONS);
+ }
+
+ private static Map<RowId, BinaryRow> randomRows(int partitionId) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ var rows = new HashMap<RowId, BinaryRow>(capacity(NUM_ROWS));
+
+ for (int i = 0; i < NUM_ROWS; i++) {
+ var rowId = new RowId(partitionId);
+
+ BinaryRow row = randomRow(random);
+
+ rows.put(rowId, row);
+ }
+
+ return rows;
+ }
+
+ private static BinaryRow randomRow(ThreadLocalRandom random) {
+ ByteBuffer buffer = ByteBuffer.allocate(ROW_LENGTH);
+
+ random.nextBytes(buffer.array());
+
+ return new BinaryRowImpl(0, buffer);
+ }
+
+ /** Randomly generated rows for a partition. */
+ @State(Scope.Thread)
+ public static class DataToAddAndCommit {
+ final int partitionId = randomPartitionId();
+
+ final Map<RowId, BinaryRow> rows = randomRows(partitionId);
+ }
+
+ /** Randomly generated rows for a partition which has also been written to
the storage. */
+ @State(Scope.Thread)
+ public static class DataToCommit {
+ final int partitionId = randomPartitionId();
+
+ final Map<RowId, BinaryRow> rows = randomRows(partitionId);
+
+ /** Setup method. */
+ @Setup
+ public void setUp(CommitManyWritesBenchmark benchmark) {
+ UUID txId = TransactionIds.transactionId(benchmark.clock.now(), 0);
+
+ MvPartitionStorage partitionStorage =
benchmark.tableStorage.getMvPartition(partitionId);
+
+ partitionStorage.runConsistently(locker -> {
+ rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId,
row, txId, TABLE_ID, partitionId));
+
+ return null;
+ });
+ }
+ }
+
+ private static RocksDbStorageEngineConfiguration engineConfiguration() {
+ RocksDbStorageEngineConfiguration config =
mock(RocksDbStorageEngineConfiguration.class);
+
+ ConfigurationValue<Integer> flushDelayMillis =
mock(ConfigurationValue.class);
+
+ when(flushDelayMillis.value()).thenReturn(100);
+
+ when(config.flushDelayMillis()).thenReturn(flushDelayMillis);
+
+ return config;
+ }
+
+ private static StorageConfiguration storageConfiguration() {
+ StorageConfiguration config = mock(StorageConfiguration.class);
+
+ NamedConfigurationTree profilesTree =
mock(NamedConfigurationTree.class);
+ NamedListView profilesView = mock(NamedListView.class);
+ RocksDbProfileView rocksDbProfileView = mock(RocksDbProfileView.class);
+
+ when(rocksDbProfileView.name()).thenReturn(STORAGE_PROFILE_NAME);
+ when(rocksDbProfileView.size()).thenReturn(16777216L);
+ when(rocksDbProfileView.writeBufferSize()).thenReturn(16777216L);
+
+ when(config.profiles()).thenReturn(profilesTree);
+ when(profilesTree.value()).thenReturn(profilesView);
+
when(profilesView.iterator()).thenReturn(List.of(rocksDbProfileView).iterator());
+
+ return config;
+ }
+
+ /** Benchmark for the combination of {@link MvPartitionStorage#addWrite}
and {@link MvPartitionStorage#commitWrite} methods. */
+ @Benchmark
+ public void addAndCommitManyWrites(DataToAddAndCommit data) {
+ MvPartitionStorage partitionStorage =
tableStorage.getMvPartition(data.partitionId);
+
+ UUID txId = TransactionIds.transactionId(clock.now(), 0);
+
+ partitionStorage.runConsistently(locker -> {
+ data.rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId,
row, txId, TABLE_ID, data.partitionId));
+
+ return null;
+ });
+
+ HybridTimestamp commitTs = clock.now();
+
+ partitionStorage.runConsistently(locker -> {
+ data.rows.keySet().forEach(rowId ->
partitionStorage.commitWrite(rowId, commitTs));
+
+ return null;
+ });
+ }
+
+ /** Benchmark for calling {@link MvPartitionStorage#addWrite} many times.
*/
+ @Benchmark
+ public void addManyWrites(DataToAddAndCommit data) {
+ MvPartitionStorage partitionStorage =
tableStorage.getMvPartition(data.partitionId);
+
+ UUID txId = TransactionIds.transactionId(clock.now(), 0);
+
+ partitionStorage.runConsistently(locker -> {
+ data.rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId,
row, txId, TABLE_ID, data.partitionId));
+
+ return null;
+ });
+ }
+
+ /** Benchmark for calling {@link MvPartitionStorage#commitWrite} many
times. */
+ @Benchmark
+ public void commitManyWrites(DataToCommit data) {
+ MvPartitionStorage partitionStorage =
tableStorage.getMvPartition(data.partitionId);
+
+ HybridTimestamp commitTs = clock.now();
+
+ partitionStorage.runConsistently(locker -> {
+ data.rows.keySet().forEach(rowId ->
partitionStorage.commitWrite(rowId, commitTs));
+
+ return null;
+ });
+ }
+
+ /** Main method. */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(CommitManyWritesBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index a4f8741a3a..8e68d23da8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.verify;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -280,11 +279,11 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
);
}
- private static void addWriteCommitted(PartitionDataStorage storage, RowId
rowId, @Nullable BinaryRow row, HybridTimestamp timestamp) {
+ private void addWriteCommitted(PartitionDataStorage storage, RowId rowId,
@Nullable BinaryRow row, HybridTimestamp timestamp) {
storage.runConsistently(locker -> {
locker.lock(rowId);
- storage.addWrite(rowId, row, UUID.randomUUID(), 999, PARTITION_ID);
+ storage.addWrite(rowId, row, newTransactionId(), 999,
PARTITION_ID);
storage.commitWrite(rowId, timestamp);