This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 11eaf8f587 IGNITE-19395 Use indirect addressing when storing rows in 
RocksDb (#4200)
11eaf8f587 is described below

commit 11eaf8f58760c4913c0ebd83d6ea1d3a435b31e2
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Aug 20 21:03:53 2024 +0300

    IGNITE-19395 Use indirect addressing when storing rows in RocksDb (#4200)
---
 modules/storage-rocksdb/build.gradle               |   3 +
 .../storage/rocksdb/ColumnFamilyUtils.java         |  10 +-
 .../internal/storage/rocksdb/GarbageCollector.java | 159 +++----
 .../storage/rocksdb/PartitionDataHelper.java       | 148 +++++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 490 +++++++++++----------
 .../storage/rocksdb/RocksDbTableStorage.java       |   7 +
 .../rocksdb/instance/SharedRocksDbInstance.java    |   8 +-
 .../instance/SharedRocksDbInstanceCreator.java     |   8 +
 .../storage/rocksdb/PartitionDataHelperTest.java   |  63 +++
 .../benchmarks/CommitManyWritesBenchmark.java      | 268 +++++++++++
 .../gc/AbstractGcUpdateHandlerTest.java            |   5 +-
 11 files changed, 830 insertions(+), 339 deletions(-)

diff --git a/modules/storage-rocksdb/build.gradle 
b/modules/storage-rocksdb/build.gradle
index 3b4803eef2..69a16af17f 100644
--- a/modules/storage-rocksdb/build.gradle
+++ b/modules/storage-rocksdb/build.gradle
@@ -37,6 +37,8 @@ dependencies {
     implementation libs.auto.service.annotations
 
     testAnnotationProcessor 
project(':ignite-configuration-annotation-processor')
+    testAnnotationProcessor libs.jmh.annotation.processor
+
     testImplementation project(':ignite-core')
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation project(':ignite-configuration')
@@ -47,6 +49,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-schema')))
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
+    testImplementation libs.jmh.core
 }
 
 description = 'ignite-storage-rocksdb'
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 15ed6ff84d..5d8716bc95 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -42,9 +42,12 @@ public class ColumnFamilyUtils {
     /** Name of the meta column family matches default columns family, meaning 
that it always exist when new table is created. */
     private static final String META_CF_NAME = "default";
 
-    /** Name of the Column Family that stores partition data. */
+    /** Name of the Column Family that stores partition data with references 
to row data. */
     private static final String PARTITION_CF_NAME = "cf-part";
 
+    /** Name of the Column Family that stores row data. */
+    private static final String DATA_CF_NAME = "cf-data";
+
     /** Name of the Column Family that stores garbage collection queue. */
     private static final String GC_QUEUE_CF_NAME = "cf-gc";
 
@@ -58,6 +61,7 @@ public class ColumnFamilyUtils {
     public static final List<byte[]> DEFAULT_CF_NAMES = List.of(
             META_CF_NAME.getBytes(UTF_8),
             PARTITION_CF_NAME.getBytes(UTF_8),
+            DATA_CF_NAME.getBytes(UTF_8),
             GC_QUEUE_CF_NAME.getBytes(UTF_8),
             HASH_INDEX_CF_NAME.getBytes(UTF_8)
     );
@@ -70,7 +74,7 @@ public class ColumnFamilyUtils {
 
     /** Utility enum to describe a type of the column family - meta or 
partition. */
     public enum ColumnFamilyType {
-        META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN;
+        META, PARTITION, GC_QUEUE, DATA, HASH_INDEX, SORTED_INDEX, UNKNOWN;
 
         /**
          * Determines column family type by its name.
@@ -85,6 +89,8 @@ public class ColumnFamilyUtils {
                 return PARTITION;
             } else if (GC_QUEUE_CF_NAME.equals(cfName)) {
                 return GC_QUEUE;
+            } else if (DATA_CF_NAME.equals(cfName)) {
+                return DATA;
             } else if (HASH_INDEX_CF_NAME.equals(cfName)) {
                 return HASH_INDEX;
             } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index e9f61d268e..d6eeac6906 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.storage.rocksdb;
 import static java.lang.ThreadLocal.withInitial;
 import static java.nio.ByteBuffer.allocateDirect;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.getFromBatchAndDb;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.isTombstone;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.wrapIterator;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE;
@@ -73,8 +76,17 @@ class GarbageCollector {
     /** Garbage collector's queue key's size. */
     private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
 
-    /** Thread-local direct buffer instance to read keys from RocksDB. */
-    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+    /** Thread-local direct buffer able to incorporate keys that correspond to 
a Data ID. */
+    private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_KEY_BUFFER =
+            withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Thread-local direct buffer able to incorporate a Data ID. */
+    private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_BUFFER =
+            withInitial(() -> 
allocateDirect(DATA_ID_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Thread-local direct buffer able to incorporate a key from the GC 
queue. */
+    private static final ThreadLocal<ByteBuffer> DIRECT_GC_KEY_BUFFER =
+            withInitial(() -> 
allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
 
     /** Helper for the rocksdb partition. */
     private final PartitionDataHelper helper;
@@ -85,14 +97,18 @@ class GarbageCollector {
     /** GC queue column family. */
     private final ColumnFamilyHandle gcQueueCf;
 
+    /** Read options for regular reads. */
+    private final ReadOptions readOpts;
+
     enum AddResult {
         WAS_TOMBSTONE, WAS_VALUE, WAS_EMPTY
     }
 
-    GarbageCollector(PartitionDataHelper helper, RocksDB db, 
ColumnFamilyHandle gcQueueCf) {
+    GarbageCollector(PartitionDataHelper helper, RocksDB db, ReadOptions 
readOpts, ColumnFamilyHandle gcQueueCf) {
         this.helper = helper;
         this.db = db;
         this.gcQueueCf = gcQueueCf;
+        this.readOpts = readOpts;
     }
 
     /**
@@ -108,26 +124,23 @@ class GarbageCollector {
      */
     AddResult tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
             throws RocksDBException {
-        ColumnFamilyHandle partCf = helper.partCf;
-
         // Try find previous value for the row id.
-        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
-        keyBuffer.clear();
+        ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
-        helper.putDataKey(keyBuffer, rowId, timestamp);
+        helper.putCommittedDataIdKey(dataIdKeyBuffer, rowId, timestamp);
 
-        try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
-            it.seek(keyBuffer);
+        try (RocksIterator it = db.newIterator(helper.partCf, 
helper.upperBoundReadOpts)) {
+            it.seek(dataIdKeyBuffer);
 
             if (invalid(it)) {
                 return AddResult.WAS_EMPTY;
             }
 
-            keyBuffer.clear();
+            dataIdKeyBuffer.clear();
 
-            int keyLen = it.key(keyBuffer);
+            int keyLen = it.key(dataIdKeyBuffer);
 
-            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+            RowId readRowId = helper.getRowId(dataIdKeyBuffer, ROW_ID_OFFSET);
 
             if (!readRowId.equals(rowId)) {
                 return AddResult.WAS_EMPTY;
@@ -136,11 +149,9 @@ class GarbageCollector {
             // Found previous value.
             assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
 
-            int valueSize = it.value(EMPTY_DIRECT_BUFFER);
-
             AddResult result;
 
-            if (valueSize == 0) {
+            if (isCurrentValueTombstone(it)) {
                 // Do not add a new tombstone if the existing value is also a 
tombstone.
                 if (isNewValueTombstone) {
                     return AddResult.WAS_TOMBSTONE;
@@ -151,11 +162,11 @@ class GarbageCollector {
                 result = AddResult.WAS_VALUE;
             }
 
-            keyBuffer.clear();
+            ByteBuffer gcKeyBuffer = DIRECT_GC_KEY_BUFFER.get().clear();
 
-            helper.putGcKey(keyBuffer, rowId, timestamp);
+            helper.putGcKey(gcKeyBuffer, rowId, timestamp);
 
-            writeBatch.put(gcQueueCf, keyBuffer, EMPTY_DIRECT_BUFFER);
+            writeBatch.put(gcQueueCf, gcKeyBuffer, EMPTY_DIRECT_BUFFER);
 
             return result;
         }
@@ -164,15 +175,16 @@ class GarbageCollector {
     /**
      * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#peek(HybridTimestamp)}.
      *
+     * @param writeBatch Current Write Batch.
      * @param lowWatermark Low watermark.
      * @return Garbage collected element descriptor.
      */
-    @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+    @Nullable GcEntry peek(WriteBatchWithIndex writeBatch, HybridTimestamp 
lowWatermark) {
         // We retrieve the first element of the GC queue and seek for it in 
the data CF.
         // However, the element that we need to garbage collect is the next 
(older one) element.
         // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
         // If the next element exists, that should be the element that we want 
to garbage collect.
-        try (RocksIterator gcIt = newWrappedIterator(gcQueueCf, 
helper.upperBoundReadOpts)) {
+        try (RocksIterator gcIt = newWrappedIterator(writeBatch, gcQueueCf, 
helper.upperBoundReadOpts)) {
             gcIt.seek(helper.partitionStartPrefix());
 
             if (invalid(gcIt)) {
@@ -198,7 +210,7 @@ class GarbageCollector {
      * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#vacuum(GcEntry)}.
      *
      * @param batch Write batch.
-     * @param entry Entry, previously returned by {@link 
#peek(HybridTimestamp)}.
+     * @param entry Entry, previously returned by {@link #peek}.
      * @return Garbage collected element.
      * @throws RocksDBException If failed to collect the garbage.
      */
@@ -211,7 +223,7 @@ class GarbageCollector {
         // However, the element that we need to garbage collect is the next 
(older one) element.
         // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
         // If the next element exists, that should be the element that we want 
to garbage collect.
-        try (RocksIterator gcIt = newWrappedIterator(gcQueueCf, 
helper.upperBoundReadOpts)) {
+        try (RocksIterator gcIt = newWrappedIterator(batch, gcQueueCf, 
helper.upperBoundReadOpts)) {
             gcIt.seek(helper.partitionStartPrefix());
 
             if (invalid(gcIt)) {
@@ -231,7 +243,7 @@ class GarbageCollector {
             // Delete element from the GC queue.
             batch.delete(gcQueueCf, gcKeyBuffer);
 
-            try (RocksIterator partIt = newWrappedIterator(partCf, 
helper.upperBoundReadOpts)) {
+            try (RocksIterator partIt = newWrappedIterator(batch, partCf, 
helper.upperBoundReadOpts)) {
                 // Process the element in data cf that triggered the addition 
to the GC queue.
                 boolean proceed = checkHasNewerRowAndRemoveTombstone(partIt, 
batch, gcRowVersion);
 
@@ -241,24 +253,29 @@ class GarbageCollector {
                 }
 
                 // Find the row that should be garbage collected.
-                ByteBuffer dataKey = getRowForGcKey(partIt, 
gcRowVersion.getRowId());
+                ByteBuffer dataIdKey = getDataIdKeyForGc(partIt, 
gcRowVersion.getRowId());
 
-                if (dataKey == null) {
+                if (dataIdKey == null) {
                     // No row for GC.
                     return null;
                 }
 
                 // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
-                byte[] valueBytes = partIt.value();
+                ByteBuffer dataId = readDataId(partIt);
+
+                assert !isTombstone(dataId);
 
-                assert valueBytes.length > 0; // Can't be a tombstone.
+                byte[] payloadKey = helper.createPayloadKey(dataId);
 
-                var row = deserializeRow(ByteBuffer.wrap(valueBytes));
+                byte[] rowBytes = getFromBatchAndDb(db, batch, helper.dataCf, 
readOpts, payloadKey);
+
+                assert rowBytes != null && rowBytes.length > 0;
 
                 // Delete the row from the data cf.
-                batch.delete(partCf, dataKey);
+                batch.delete(partCf, dataIdKey);
+                batch.delete(helper.dataCf, payloadKey);
 
-                return row;
+                return deserializeRow(rowBytes);
             }
         }
     }
@@ -280,73 +297,65 @@ class GarbageCollector {
             WriteBatchWithIndex batch,
             GcRowVersion gcRowVersion
     ) throws RocksDBException {
-        ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
-        dataKeyBuffer.clear();
-
-        ColumnFamilyHandle partCf = helper.partCf;
+        ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
         // Set up the data key.
-        helper.putDataKey(dataKeyBuffer, gcRowVersion.getRowId(), 
gcRowVersion.getTimestamp());
+        helper.putCommittedDataIdKey(dataIdKeyBuffer, gcRowVersion.getRowId(), 
gcRowVersion.getTimestamp());
 
         // Seek to the row id and timestamp from the GC queue.
         // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
-        it.seek(dataKeyBuffer);
+        it.seek(dataIdKeyBuffer);
 
         if (invalid(it)) {
             // There is no row for the GC queue element.
             return false;
-        } else {
-            dataKeyBuffer.clear();
+        }
 
-            it.key(dataKeyBuffer);
+        dataIdKeyBuffer.clear();
 
-            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
-                // There is no row for the GC queue element.
-                return false;
-            }
+        it.key(dataIdKeyBuffer);
+
+        if (!helper.getRowId(dataIdKeyBuffer, 
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
+            // There is no row for the GC queue element.
+            return false;
         }
 
         // Check if the new element, whose insertion scheduled the GC, was a 
tombstone.
-        int len = it.value(EMPTY_DIRECT_BUFFER);
-
-        if (len == 0) {
+        if (isCurrentValueTombstone(it)) {
             // This is a tombstone, we need to delete it.
-            batch.delete(partCf, dataKeyBuffer);
+            batch.delete(helper.partCf, dataIdKeyBuffer);
         }
 
         return true;
     }
 
     /**
-     * Checks if there is a row for garbage collection and returns this row's 
key if it exists.
+     * Checks if there is a row ID for garbage collection and returns this 
row's data ID key if it exists.
      * There might already be no row in the data column family, because GC can 
be run in parallel.
      *
      * @param it RocksDB data column family iterator.
-     * @param gcElementRowId Row id of the element from the GC queue/
-     * @return Key of the row that needs to be garbage collected, or {@code 
null} if such row doesn't exist.
+     * @param gcElementRowId Row id of the element from the GC queue.
+     * @return Key for the row's data ID that needs to be garbage collected, 
or {@code null} if such row doesn't exist.
      */
-    private @Nullable ByteBuffer getRowForGcKey(RocksIterator it, RowId 
gcElementRowId) {
+    private @Nullable ByteBuffer getDataIdKeyForGc(RocksIterator it, RowId 
gcElementRowId) {
         // Let's move to the element that was scheduled for GC.
         it.next();
 
-        RowId gcRowId;
-
         if (invalid(it)) {
             return null;
         }
 
-        ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
-        dataKeyBuffer.clear();
+        ByteBuffer dataIdKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
-        int keyLen = it.key(dataKeyBuffer);
+        int keyLen = it.key(dataIdKeyBuffer);
 
         // Check if we moved to another row id's write-intent, that would mean 
that there is no row to GC for the current row id.
         if (keyLen == MAX_KEY_SIZE) {
-            gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+            RowId rowId = helper.getRowId(dataIdKeyBuffer, ROW_ID_OFFSET);
 
             // We might have moved to the next row id.
-            if (gcElementRowId.equals(gcRowId)) {
-                return dataKeyBuffer;
+            if (gcElementRowId.equals(rowId)) {
+                return dataIdKeyBuffer;
             }
         }
 
@@ -363,9 +372,8 @@ class GarbageCollector {
         writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
     }
 
-    private ByteBuffer readGcKey(RocksIterator gcIt) {
-        ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
-        gcKeyBuffer.clear();
+    private static ByteBuffer readGcKey(RocksIterator gcIt) {
+        ByteBuffer gcKeyBuffer = DIRECT_GC_KEY_BUFFER.get().clear();
 
         gcIt.key(gcKeyBuffer);
 
@@ -379,20 +387,21 @@ class GarbageCollector {
         );
     }
 
-    private void refreshGcIterator(RocksIterator gcIt, ByteBuffer gcKeyBuffer) 
throws RocksDBException {
-        gcIt.refresh();
-
-        gcIt.seekForPrev(gcKeyBuffer);
+    private RocksIterator newWrappedIterator(WriteBatchWithIndex writeBatch, 
ColumnFamilyHandle cf, ReadOptions readOptions) {
+        RocksIterator it = db.newIterator(cf, readOptions);
 
-        // Row version was removed from the gc queue by someone, back to the 
head of gc queue.
-        if (invalid(gcIt)) {
-            gcIt.seek(helper.partitionStartPrefix());
-        }
+        return wrapIterator(it, writeBatch, cf);
     }
 
-    private RocksIterator newWrappedIterator(ColumnFamilyHandle cf, 
ReadOptions readOptions) {
-        RocksIterator it = db.newIterator(cf, readOptions);
+    private static ByteBuffer readDataId(RocksIterator it) {
+        ByteBuffer dataId = DIRECT_DATA_ID_BUFFER.get().clear();
+
+        it.value(dataId);
+
+        return dataId;
+    }
 
-        return helper.wrapIterator(it, cf);
+    private static boolean isCurrentValueTombstone(RocksIterator it) {
+        return isTombstone(readDataId(it));
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index d7192f0496..194a25d583 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static java.lang.ThreadLocal.withInitial;
-import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.ByteBuffer.allocate;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
@@ -44,45 +43,50 @@ import org.apache.ignite.internal.storage.util.LockByRowId;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Slice;
 import org.rocksdb.WriteBatchWithIndex;
 
 /** Helper for the partition data. */
 public final class PartitionDataHelper implements ManuallyCloseable {
+    /** Transaction id size (part of the transaction state). */
+    private static final int TX_ID_SIZE = 2 * Long.BYTES;
+
     /** Position of row id inside the key. */
-    static final int ROW_ID_OFFSET = TABLE_ID_SIZE + Short.BYTES;
+    static final int ROW_ID_OFFSET = TABLE_ID_SIZE + PARTITION_ID_SIZE;
 
-    /** Size of the key without timestamp. */
+    /**
+     * Part of the of the key associated with transaction state without the 
timestamp.
+     * Also equal to the size of a key for an uncommitted Write Intent.
+     */
     public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
 
-    /** Maximum size of the data key. */
+    /** Maximum size of the key associated with transaction state. */
     static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
 
-    /** Transaction id size (part of the transaction state). */
-    private static final int TX_ID_SIZE = 2 * Long.BYTES;
-
-    /** Size of the value header (transaction state). */
-    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
-
-    /** Transaction id offset. */
-    static final int TX_ID_OFFSET = 0;
+    /** Size of Data ID. */
+    static final int DATA_ID_SIZE = ROW_ID_SIZE + HYBRID_TIMESTAMP_SIZE;
 
-    /** Commit table id offset. */
-    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+    /** Offset of Data ID inside the key associated with row data. */
+    private static final int DATA_ID_OFFSET = TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
 
-    /** Commit partition id offset. */
-    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+    /** Size of the key associated with row data. */
+    private static final int PAYLOAD_KEY_SIZE = DATA_ID_OFFSET + DATA_ID_SIZE;
 
-    /** Value offset (if transaction state is present). */
-    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+    /** Size of the transaction state part of the value (Tx ID + Commit Table 
ID + Commit Partition ID). */
+    private static final int TX_STATE_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
 
-    /** Thread-local direct buffer instance to read keys from RocksDB. */
-    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+    static final int DATA_ID_WITH_TX_STATE_SIZE = DATA_ID_SIZE + TX_STATE_SIZE;
 
     /** Thread-local write batch for {@link 
MvPartitionStorage#runConsistently(WriteClosure)}. */
     static final ThreadLocal<ThreadLocalState> THREAD_LOCAL_STATE = new 
ThreadLocal<>();
 
+    /** Thread-local buffer for payload keys. */
+    private static final ThreadLocal<ByteBuffer> PAYLOAD_KEY_BUFFER =
+            ThreadLocal.withInitial(() -> 
allocate(PAYLOAD_KEY_SIZE).order(KEY_BYTE_ORDER));
+
     /** Table ID. */
     private final int tableId;
 
@@ -95,6 +99,8 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
     /** Partition data column family. */
     final ColumnFamilyHandle partCf;
 
+    final ColumnFamilyHandle dataCf;
+
     /** Read options for regular scans. */
     final ReadOptions upperBoundReadOpts;
 
@@ -109,10 +115,11 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
     /** Prefix for finding the ending of the partition. */
     private final byte[] partitionEndPrefix;
 
-    PartitionDataHelper(int tableId, int partitionId, ColumnFamilyHandle 
partCf) {
+    PartitionDataHelper(int tableId, int partitionId, ColumnFamilyHandle 
partCf, ColumnFamilyHandle dataCf) {
         this.tableId = tableId;
         this.partitionId = partitionId;
         this.partCf = partCf;
+        this.dataCf = dataCf;
 
         this.partitionStartPrefix = compositeKey(tableId, partitionId);
         this.partitionEndPrefix = incrementPrefix(partitionStartPrefix);
@@ -140,18 +147,20 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return partitionEndPrefix;
     }
 
-    void putDataKey(ByteBuffer dataKeyBuffer, RowId rowId, HybridTimestamp 
timestamp) {
+    void putCommittedDataIdKey(ByteBuffer buffer, RowId rowId, HybridTimestamp 
timestamp) {
+        assert buffer.order() == KEY_BYTE_ORDER;
         assert rowId.partitionId() == partitionId : "rowPartitionId=" + 
rowId.partitionId() + ", storagePartitionId=" + partitionId;
 
-        dataKeyBuffer.putInt(tableId);
-        dataKeyBuffer.putShort((short) partitionId);
-        putRowIdUuid(dataKeyBuffer, rowId.uuid());
-        putTimestampDesc(dataKeyBuffer, timestamp);
+        buffer.putInt(tableId);
+        buffer.putShort((short) partitionId);
+        putRowIdUuid(buffer, rowId.uuid());
+        putTimestampDesc(buffer, timestamp);
 
-        dataKeyBuffer.flip();
+        buffer.flip();
     }
 
     void putGcKey(ByteBuffer gcKeyBuffer, RowId rowId, HybridTimestamp 
timestamp) {
+        assert gcKeyBuffer.order() == KEY_BYTE_ORDER;
         assert rowId.partitionId() == partitionId : "rowPartitionId=" + 
rowId.partitionId() + ", storagePartitionId=" + partitionId;
 
         gcKeyBuffer.putInt(tableId);
@@ -174,7 +183,6 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return new RowId(partitionId, getRowIdUuid(keyBuffer, offset));
     }
 
-
     /**
      * Returns a WriteBatch that can be used by the affiliated storage 
implementation (like indices) to maintain consistency when run
      * inside the {@link MvPartitionStorage#runConsistently} method.
@@ -200,7 +208,7 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
      * Creates a byte array key, that consists of table or index ID (4 bytes), 
followed by a partition ID (2 bytes).
      */
     public static byte[] compositeKey(int tableOrIndexId, int partitionId) {
-        return ByteBuffer.allocate(ROW_ID_OFFSET).order(KEY_BYTE_ORDER)
+        return allocate(ROW_ID_OFFSET).order(KEY_BYTE_ORDER)
                 .putInt(tableOrIndexId)
                 .putShort((short) partitionId)
                 .array();
@@ -241,9 +249,13 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return hybridTimestamp(time);
     }
 
-    RocksIterator wrapIterator(RocksIterator it, ColumnFamilyHandle cf) {
-        WriteBatchWithIndex writeBatch = currentWriteBatch();
+    static RocksIterator wrapIterator(RocksIterator it, ColumnFamilyHandle cf) 
{
+        return wrapIterator(it, currentWriteBatch(), cf);
+    }
 
+    static RocksIterator wrapIterator(RocksIterator it, @Nullable 
WriteBatchWithIndex writeBatch, ColumnFamilyHandle cf) {
+        // "count()" check is mandatory. Write batch iterator without any 
updates just crashes everything.
+        // It's not documented, but this is exactly how it should be used.
         if (writeBatch != null && writeBatch.count() > 0) {
             return writeBatch.newIteratorWithBase(cf, it);
         }
@@ -251,6 +263,29 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return it;
     }
 
+    static byte @Nullable [] getFromBatchAndDb(
+            RocksDB db, ColumnFamilyHandle cfHandle, ReadOptions readOptions, 
byte[] key
+    ) throws RocksDBException {
+        return getFromBatchAndDb(db, currentWriteBatch(), cfHandle, 
readOptions, key);
+    }
+
+    static byte @Nullable [] getFromBatchAndDb(
+            RocksDB db, @Nullable WriteBatchWithIndex writeBatch, 
ColumnFamilyHandle cfHandle, ReadOptions readOptions, byte[] key
+    ) throws RocksDBException {
+        return writeBatch == null || writeBatch.count() == 0
+                ? db.get(cfHandle, readOptions, key)
+                : writeBatch.getFromBatchAndDB(db, cfHandle, readOptions, key);
+    }
+
+    /**
+     * Converts an internal serialized presentation of a binary row into its 
Java Object counterpart.
+     */
+    static BinaryRow deserializeRow(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN);
+
+        return deserializeRow(buffer);
+    }
+
     /**
      * Converts an internal serialized presentation of a binary row into its 
Java Object counterpart.
      */
@@ -263,6 +298,55 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         return new BinaryRowImpl(schemaVersion, binaryTupleSlice);
     }
 
+    byte[] createPayloadKey(ByteBuffer dataId) {
+        byte[] result = PAYLOAD_KEY_BUFFER.get()
+                .clear()
+                .putInt(tableId)
+                .putShort((short) partitionId)
+                .put(dataId)
+                .array();
+
+        dataId.rewind();
+
+        // Always use 0 for the first bit (tombstone flag), because it only 
makes sense when data ID is stored as a value.
+        setFirstBit(result, result.length - 1, false);
+
+        return result;
+    }
+
+    /**
+     * Changes the first bit of the byte identified by an index in an array.
+     *
+     * @param array Array containing the byte to change.
+     * @param index Index of the byte inside the array.
+     * @param value If {@code true} - sets the bit to 1, else to 0.
+     */
+    static void setFirstBit(byte[] array, int index, boolean value) {
+        if (value) {
+            array[index] |= 0x01;
+        } else {
+            array[index] &= 0xFE;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the given data ID points to a tombstone.
+     */
+    static boolean isTombstone(ByteBuffer dataId) {
+        byte lastByte = dataId.get(dataId.limit() - 1);
+
+        return (lastByte & 0x1) != 0;
+    }
+
+    /**
+     * Returns {@code true} if the given data ID points to a tombstone.
+     */
+    static boolean isTombstone(byte[] dataId) {
+        byte lastByte = dataId[dataId.length - 1];
+
+        return (lastByte & 0x1) != 0;
+    }
+
     @Override
     public void close() {
         RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound);
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 10572189d1..15e1a33661 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -21,18 +21,21 @@ import static java.lang.ThreadLocal.withInitial;
 import static java.nio.ByteBuffer.allocate;
 import static java.nio.ByteBuffer.allocateDirect;
 import static java.util.Arrays.copyOf;
-import static java.util.Arrays.copyOfRange;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.DATA_ID_WITH_TX_STATE_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.THREAD_LOCAL_STATE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE;
-import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.deserializeRow;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.getFromBatchAndDb;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.isTombstone;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc;
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.requireWriteBatch;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.setFirstBit;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.wrapIterator;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.ESTIMATED_SIZE_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.LEASE_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
@@ -45,7 +48,6 @@ import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.transitionToTerminalState;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes;
@@ -73,6 +75,7 @@ import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.storage.rocksdb.GarbageCollector.AddResult;
 import org.apache.ignite.internal.storage.util.LocalLocker;
 import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
@@ -86,27 +89,44 @@ import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 
 /**
- * Multi-versioned partition storage implementation based on RocksDB. Stored 
data has the following format.
+ * Multi-versioned partition storage implementation based on RocksDB.
+ *
+ * <p>It uses two RocksDB Column Families to store partition data: one for 
managing transaction state of rows and the other for
+ * storing actual row data. We call these Column Families "Partition CF" and 
"Data CF" respectfully.
+ *
+ * <p>Partition Column Family has the following format:
  *
  * <p>Key:
  * <pre>{@code
  * For write-intents
- * | tableId (4 bytes, BE) | partId (2 bytes, BE) | rowId (16 bytes, BE) |
+ * | Table ID (4 bytes, BE) | Partition ID (2 bytes, BE) | Row ID (16 bytes, 
BE) |
  *
  * For committed rows
- * | tableId (4 bytes, BE) | partId (2 bytes, BE) | rowId (16 bytes, BE) | 
timestamp (8 bytes, DESC) |
+ * | Table ID (4 bytes, BE) | Partition ID (2 bytes, BE) | Row ID (16 bytes, 
BE) | Commit Timestamp (8 bytes, DESC) |
  * }</pre>
- * Value:
+ *
+ * <p>Value:
+ *
  * <pre>{@code
  * For write-intents
- * | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes) 
| Row data |
+ * | Data ID (24 bytes, BE) | Commit Table ID (16 bytes) | Commit Partition ID 
(2 bytes) |
  *
  * For committed rows
- * | Row data |
+ * | Data ID (24 bytes, BE) |
+ * }</pre>
+ *
+ * <p>Each Data ID is then used as a key inside the Data Column Family and 
uniquely identifies actual row data.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | Data ID (24 bytes, BE) |
  * }</pre>
  *
- * <p>Pending transactions (write-intents) data doesn't have a timestamp 
assigned, but they have transaction
- * state (txId, commitTableId and commitPartitionId).
+ * <p>Value:
+ *
+ * <pre>{@code
+ * | Row data |
+ * }</pre>
  *
  * <p>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
  *
@@ -115,10 +135,21 @@ import org.rocksdb.WriteBatchWithIndex;
  * could be interpreted as a moment infinitely far away in the future.
  */
 public class RocksDbMvPartitionStorage implements MvPartitionStorage {
-    /** Thread-local on-heap byte buffer instance to use for key 
manipulations. */
-    private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = 
withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+    /** Thread-local on-heap buffer able to incorporate keys that correspond 
to a committed Data ID. */
+    private static final ThreadLocal<ByteBuffer> 
HEAP_COMMITTED_DATA_ID_KEY_BUFFER =
+            withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
 
-    private static final ThreadLocal<ByteBuffer> DIRECT_KEY_BUFFER = 
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+    /** Thread-local on-heap buffer able to incorporate keys that correspond 
to an uncommitted Data ID. */
+    private static final ThreadLocal<ByteBuffer> HEAP_DATA_ID_KEY_BUFFER =
+            withInitial(() -> allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Thread-local direct buffer able to incorporate keys that correspond to 
a Data ID. */
+    private static final ThreadLocal<ByteBuffer> DIRECT_DATA_ID_KEY_BUFFER =
+            withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Thread-local on-heap buffer able to incorporate Data ID and TX state. 
*/
+    private static final ThreadLocal<ByteBuffer> TX_STATE_BUFFER =
+            withInitial(() -> 
allocate(DATA_ID_WITH_TX_STATE_SIZE).order(KEY_BYTE_ORDER));
 
     /** Table storage instance. */
     private final RocksDbTableStorage tableStorage;
@@ -193,8 +224,8 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         meta = tableStorage.metaCfHandle();
 
         int tableId = tableStorage.getTableId();
-        helper = new PartitionDataHelper(tableId, partitionId, 
tableStorage.partitionCfHandle());
-        gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle());
+        helper = new PartitionDataHelper(tableId, partitionId, 
tableStorage.partitionCfHandle(), tableStorage.dataCfHandle());
+        gc = new GarbageCollector(helper, db, readOpts, 
tableStorage.gcQueueHandle());
 
         lastAppliedIndexAndTermKey = createKey(PARTITION_META_PREFIX, tableId, 
partitionId);
         lastGroupConfigKey = createKey(PARTITION_CONF_PREFIX, tableId, 
partitionId);
@@ -330,7 +361,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
             try {
-                
savePendingLastApplied(PartitionDataHelper.requireWriteBatch(), 
lastAppliedIndex, lastAppliedTerm);
+                savePendingLastApplied(requireWriteBatch(), lastAppliedIndex, 
lastAppliedTerm);
 
                 return null;
             } catch (RocksDBException e) {
@@ -381,7 +412,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
             try {
-                
saveGroupConfiguration(PartitionDataHelper.requireWriteBatch(), config);
+                saveGroupConfiguration(requireWriteBatch(), config);
 
                 return null;
             } catch (RocksDBException e) {
@@ -405,99 +436,122 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, int commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
         return busy(() -> {
-            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
+            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             assert rowIsLocked(rowId);
 
-            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId).rewind();
-
-            BinaryRow res = null;
-
             try {
                 // Check concurrent transaction data.
-                byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
+                byte[] uncommittedDataIdKey = 
createUncommittedDataIdKey(rowId);
 
-                keyBuf.get(keyBytes);
+                byte[] previousTxState = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, uncommittedDataIdKey);
 
-                keyBuf.rewind();
+                // Previous value must belong to the same transaction.
+                if (previousTxState != null) {
+                    ByteBuffer previousTxStateBuffer = 
ByteBuffer.wrap(previousTxState);
 
-                byte[] previousValueBytes = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
+                    validateTxId(previousTxStateBuffer, txId);
 
-                // Previous value must belong to the same transaction.
-                if (previousValueBytes != null) {
-                    ByteBuffer previousValue = 
ByteBuffer.wrap(previousValueBytes);
+                    ByteBuffer dataId = 
readDataIdFromTxState(previousTxStateBuffer);
 
-                    validateTxId(previousValue, txId);
+                    byte[] payloadKey = helper.createPayloadKey(dataId);
 
-                    res = wrapValueIntoBinaryRow(previousValue, true);
-                }
+                    BinaryRow previousRow = null;
+
+                    boolean isOldValueTombstone = isTombstone(dataId);
+
+                    if (!isOldValueTombstone) {
+                        byte[] previousRowBytes = 
writeBatch.getFromBatchAndDB(db, helper.dataCf, readOpts, payloadKey);
 
-                if (row == null) {
-                    ByteBuffer value = allocate(VALUE_HEADER_SIZE);
+                        previousRow = deserializeRow(previousRowBytes);
+                    }
+
+                    // We need to flip the tombstone bit in case we are 
overwriting a previous Write Intent with a different
+                    // tombstone bit.
+                    if (isOldValueTombstone ^ (row == null)) {
+                        setFirstBit(previousTxState, DATA_ID_SIZE - 1, row == 
null);
 
-                    // Write empty value as a tombstone.
-                    if (previousValueBytes != null) {
-                        // Reuse old array with transaction id already written 
to it.
-                        value.put(previousValueBytes, 0, VALUE_HEADER_SIZE);
-                    } else {
-                        writeHeader(value, txId, commitTableId, 
commitPartitionId);
+                        writeBatch.put(helper.partCf, uncommittedDataIdKey, 
previousTxState);
                     }
 
-                    writeBatch.put(helper.partCf, keyBytes, value.array());
+                    // No need to update the Data ID key because it should be 
the same as already in the storage.
+                    if (row != null) {
+                        writeBatch.put(helper.dataCf, payloadKey, 
serializeBinaryRow(row));
+                    }
+
+                    return previousRow;
                 } else {
-                    writeUnversioned(keyBytes, row, txId, commitTableId, 
commitPartitionId);
+                    ByteBuffer txState = createTxState(rowId, txId, 
commitTableId, commitPartitionId, row == null);
+
+                    ByteBuffer dataId = readDataIdFromTxState(txState);
+
+                    writeBatch.put(helper.partCf, uncommittedDataIdKey, 
txState.array());
+
+                    if (row != null) {
+                        writeBatch.put(helper.dataCf, 
helper.createPayloadKey(dataId), serializeBinaryRow(row));
+                    }
+
+                    return null;
                 }
             } catch (RocksDBException e) {
                 throw new StorageException("Failed to update a row in storage: 
" + createStorageInfo(), e);
             }
-
-            return res;
         });
     }
 
-    /**
-     * Writes a tuple of transaction id and a row bytes, using "row prefix" of 
a key array as a storage key.
-     *
-     * @param keyArray Array that has partition id and row id in its prefix.
-     * @param row Binary row, not null.
-     * @param txId Transaction id.
-     * @throws RocksDBException If write failed.
-     */
-    private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId, 
int commitTableId, int commitPartitionId)
-            throws RocksDBException {
-        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
-
-        ByteBuffer value = allocate(rowSize(row) + VALUE_HEADER_SIZE);
-
-        writeHeader(value, txId, commitTableId, commitPartitionId);
+    private static ByteBuffer createDataId(RowId rowId, HybridTimestamp 
txTimestamp, boolean isTombstone) {
+        ByteBuffer buffer = allocate(DATA_ID_SIZE).order(KEY_BYTE_ORDER);
 
-        writeBinaryRow(value, row);
+        putDataId(buffer, rowId, txTimestamp, isTombstone);
 
-        // Write table row data as a value.
-        writeBatch.put(helper.partCf, keyArray, value.array());
+        return buffer.rewind();
     }
 
-    private static int rowSize(BinaryRow row) {
-        // Tuple + schema version.
-        return row.tupleSliceLength() + Short.BYTES;
-    }
+    private static ByteBuffer createTxState(RowId rowId, UUID txId, int 
commitTableId, int commitPartitionId, boolean isTombstone) {
+        ByteBuffer buffer = TX_STATE_BUFFER.get().clear();
 
-    private static void writeHeader(ByteBuffer dest, UUID txId, int 
commitTableId, int commitPartitionId) {
-        assert dest.order() == ByteOrder.BIG_ENDIAN;
+        putDataId(buffer, rowId, TransactionIds.beginTimestamp(txId), 
isTombstone);
 
-        dest
+        return buffer
                 .putLong(txId.getMostSignificantBits())
                 .putLong(txId.getLeastSignificantBits())
                 .putInt(commitTableId)
-                .putShort((short) commitPartitionId);
+                .putShort((short) commitPartitionId)
+                .rewind();
     }
 
-    private static void writeBinaryRow(ByteBuffer dest, BinaryRow row) {
-        assert dest.order() == ByteOrder.BIG_ENDIAN;
+    private static void putDataId(ByteBuffer buffer, RowId rowId, 
HybridTimestamp txTimestamp, boolean isTombstone) {
+        long timestamp = txTimestamp.longValue();
+
+        // We use the sign bit from the timestamp (which is always zero, 
because timestamp is always positive) to indicate whether
+        // Data ID points to a tombstone. This can help to avoid indirect 
reads for tombstones.
+        long timestampWithTombstoneFlag = timestamp << 1 | (isTombstone ? 1 : 
0);
+
+        buffer
+                .putLong(rowId.mostSignificantBits())
+                .putLong(rowId.leastSignificantBits())
+                .putLong(timestampWithTombstoneFlag);
+    }
+
+    private static ByteBuffer readDataIdFromTxState(ByteBuffer txState) {
+        int prevLimit = txState.limit();
+
+        ByteBuffer dataId = txState
+                .limit(DATA_ID_SIZE)
+                .slice()
+                .order(KEY_BYTE_ORDER);
+
+        txState.position(txState.position() + DATA_ID_SIZE).limit(prevLimit);
 
-        dest
+        return dataId;
+    }
+
+    private static byte[] serializeBinaryRow(BinaryRow row) {
+        return allocate(Short.BYTES + row.tupleSliceLength())
+                .order(KEY_BYTE_ORDER)
                 .putShort((short) row.schemaVersion())
-                .put(row.tupleSlice());
+                .put(row.tupleSlice())
+                .array();
     }
 
     @Override
@@ -505,32 +559,38 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
+            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             assert rowIsLocked(rowId);
 
-            ByteBuffer keyBuf = prepareDirectKeyBuf(rowId)
-                    .position(0)
-                    .limit(ROW_PREFIX_SIZE);
+            byte[] uncommittedDataIdKey = createUncommittedDataIdKey(rowId);
 
             try {
-                byte[] keyBytes = new byte[ROW_PREFIX_SIZE];
+                byte[] dataIdWithTxState = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, uncommittedDataIdKey);
 
-                keyBuf.get(keyBytes);
+                if (dataIdWithTxState == null) {
+                    // The chain doesn't contain an uncommitted write intent.
+                    return null;
+                }
 
-                keyBuf.rewind();
+                ByteBuffer dataId = 
readDataIdFromTxState(ByteBuffer.wrap(dataIdWithTxState));
 
-                byte[] previousValue = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
+                byte[] payloadKey = helper.createPayloadKey(dataId);
 
-                if (previousValue == null) {
-                    // The chain doesn't contain an uncommitted write intent.
-                    return null;
+                BinaryRow row = null;
+
+                if (!isTombstone(dataId)) {
+                    byte[] rowBytes = writeBatch.getFromBatchAndDB(db, 
helper.dataCf, readOpts, payloadKey);
+
+                    row = deserializeRow(rowBytes);
                 }
 
                 // Perform unconditional remove for the key without associated 
timestamp.
-                writeBatch.delete(helper.partCf, keyBuf);
+                writeBatch.delete(helper.partCf, uncommittedDataIdKey);
+
+                writeBatch.delete(helper.dataCf, payloadKey);
 
-                return wrapValueIntoBinaryRow(ByteBuffer.wrap(previousValue), 
true);
+                return row;
             } catch (RocksDBException e) {
                 throw new StorageException("Failed to roll back 
insert/update", e);
             }
@@ -546,29 +606,31 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException {
         busy(() -> {
-            WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
             assert rowIsLocked(rowId);
 
-            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            byte[] dataIdKey = createCommittedDataIdKey(rowId, timestamp);
+
+            byte[] uncommittedDataIdKey = copyOf(dataIdKey, ROW_PREFIX_SIZE);
 
             try {
                 // Read a value associated with pending write.
-                byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), 
ROW_PREFIX_SIZE);
-
-                byte[] valueBytes = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, uncommittedKeyBytes);
+                byte[] txState = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, uncommittedDataIdKey);
 
-                if (valueBytes == null) {
+                if (txState == null) {
                     // The chain doesn't contain an uncommitted write intent.
                     return null;
                 }
 
-                boolean isNewValueTombstone = valueBytes.length == 
VALUE_HEADER_SIZE;
+                byte[] dataId = copyOf(txState, DATA_ID_SIZE);
+
+                boolean isNewValueTombstone = isTombstone(dataId);
 
                 AddResult addResult = gc.tryAddToGcQueue(writeBatch, rowId, 
timestamp, isNewValueTombstone);
 
                 // Delete pending write.
-                writeBatch.delete(helper.partCf, uncommittedKeyBytes);
+                writeBatch.delete(helper.partCf, uncommittedDataIdKey);
 
                 // We only write tombstone if the previous value for the same 
row id was not a tombstone.
                 // So there won't be consecutive tombstones for the same row 
id.
@@ -577,13 +639,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // Add timestamp to the key, and put the value back into the 
storage.
-                putTimestampDesc(keyBuf, timestamp);
-
-                writeBatch.put(
-                        helper.partCf,
-                        copyOf(keyBuf.array(), MAX_KEY_SIZE),
-                        copyOfRange(valueBytes, VALUE_HEADER_SIZE, 
valueBytes.length)
-                );
+                writeBatch.put(helper.partCf, dataIdKey, dataId);
 
                 updateEstimatedSize(isNewValueTombstone, addResult);
 
@@ -597,7 +653,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     @Override
     public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, 
HybridTimestamp commitTimestamp) throws StorageException {
         busy(() -> {
-            WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
             assert rowIsLocked(rowId);
 
@@ -612,25 +668,17 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     return null;
                 }
 
-                // TODO IGNITE-16913 Add proper way to write row bytes into 
array without allocations.
-                byte[] rowBytes;
+                byte[] dataIdKey = createCommittedDataIdKey(rowId, 
commitTimestamp);
 
-                if (row == null) {
-                    rowBytes = BYTE_EMPTY_ARRAY;
-                } else {
-                    ByteBuffer rowBuffer = allocate(rowSize(row));
+                ByteBuffer dataId = createDataId(rowId, commitTimestamp, 
isNewValueTombstone);
 
-                    writeBinaryRow(rowBuffer, row);
+                writeBatch.put(helper.partCf, dataIdKey, dataId.array());
 
-                    rowBytes = rowBuffer.array();
+                // TODO IGNITE-16913 Add proper way to write row bytes into 
array without allocations.
+                if (row != null) {
+                    writeBatch.put(helper.dataCf, 
helper.createPayloadKey(dataId), serializeBinaryRow(row));
                 }
 
-                ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
-                putTimestampDesc(keyBuf, commitTimestamp);
-
-                writeBatch.put(helper.partCf, keyBuf.array(), rowBytes);
-
                 updateEstimatedSize(isNewValueTombstone, addResult);
 
                 return null;
@@ -666,17 +714,10 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                         String.format("RowId partition [%d] is not equal to 
storage partition [%d].", rowId.partitionId(), partitionId));
             }
 
-            // We can read data outside of consistency closure. Batch is not 
required.
-            WriteBatchWithIndex writeBatch = helper.currentWriteBatch();
-
             try (
                     // Set next partition as an upper bound.
                     RocksIterator baseIterator = db.newIterator(helper.partCf, 
helper.upperBoundReadOpts);
-                    // "count()" check is mandatory. Write batch iterator 
without any updates just crashes everything.
-                    // It's not documented, but this is exactly how it should 
be used.
-                    RocksIterator seekIterator = writeBatch != null && 
writeBatch.count() > 0
-                            ? writeBatch.newIteratorWithBase(helper.partCf, 
baseIterator)
-                            : baseIterator
+                    RocksIterator seekIterator = wrapIterator(baseIterator, 
helper.partCf)
             ) {
                 if (lookingForLatestVersions(timestamp)) {
                     return readLatestVersion(rowId, seekIterator);
@@ -692,26 +733,26 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     private ReadResult readLatestVersion(RowId rowId, RocksIterator 
seekIterator) {
-        ByteBuffer keyBuf = prepareDirectKeyBuf(rowId);
+        ByteBuffer dataIdKeyPrefix = prepareDirectDataIdKeyBuf(rowId)
+                .position(0)
+                .limit(ROW_PREFIX_SIZE);
 
         // Seek to the first appearance of row id if timestamp isn't set.
         // Since timestamps are sorted from newest to oldest, first occurrence 
will always be the latest version.
-        assert keyBuf.position() == ROW_PREFIX_SIZE;
-
-        
seekIterator.seek(keyBuf.duplicate().position(0).limit(ROW_PREFIX_SIZE));
+        seekIterator.seek(dataIdKeyPrefix);
 
         if (invalid(seekIterator)) {
             // No data at all.
             return ReadResult.empty(rowId);
         }
 
-        ByteBuffer readKeyBuf = DIRECT_KEY_BUFFER.get().clear();
+        ByteBuffer dataIdKey = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
-        int keyLength = seekIterator.key(readKeyBuf);
+        int keyLength = seekIterator.key(dataIdKey);
 
-        readKeyBuf.position(0).limit(keyLength);
+        dataIdKey.position(0).limit(keyLength);
 
-        if (!matches(rowId, readKeyBuf)) {
+        if (!matches(rowId, dataIdKey)) {
             // It is already a different row, so no version exists for our 
rowId.
             return ReadResult.empty(rowId);
         }
@@ -720,15 +761,19 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         ByteBuffer valueBytes = ByteBuffer.wrap(seekIterator.value());
 
-        return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, 
valueBytes);
+        return readResultFromKeyAndValue(isWriteIntent, dataIdKey, valueBytes);
     }
 
-    private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, 
ByteBuffer keyBuf, ByteBuffer valueBytes) {
-        RowId rowId = getRowId(keyBuf);
+    private ReadResult readResultFromKeyAndValue(
+            boolean isWriteIntent,
+            ByteBuffer dataIdKey,
+            ByteBuffer valueBytes
+    ) {
+        RowId rowId = getRowId(dataIdKey);
 
         if (!isWriteIntent) {
             // There is no write-intent, return latest committed row.
-            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(keyBuf));
+            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(dataIdKey));
         }
 
         return wrapUncommittedValue(rowId, valueBytes, null);
@@ -743,16 +788,13 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * @return Read result.
      */
     private ReadResult readByTimestamp(RocksIterator seekIterator, RowId 
rowId, HybridTimestamp timestamp) {
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
-        // Put timestamp restriction according to N2O timestamps order.
-        putTimestampDesc(keyBuf, timestamp);
+        byte[] committedDataIdKey = createCommittedDataIdKey(rowId, timestamp);
 
         // This seek will either find a key with timestamp that's less or 
equal than required value, or a different key whatsoever.
         // It is guaranteed by descending order of timestamps.
-        seekIterator.seek(keyBuf.array());
+        seekIterator.seek(committedDataIdKey);
 
-        return handleReadByTimestampIterator(seekIterator, rowId, timestamp, 
keyBuf);
+        return handleReadByTimestampIterator(seekIterator, rowId, timestamp, 
committedDataIdKey);
     }
 
     /**
@@ -763,15 +805,19 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * @param seekIterator Iterator, on which seek operation was already 
performed.
      * @param rowId Row id.
      * @param timestamp Timestamp.
-     * @param keyBuf Buffer with a key in it: partition id + row id + 
timestamp.
+     * @param committedDataIdKey Key for a committed entry: partition id + row 
id + timestamp.
      * @return Read result.
      */
-    private static ReadResult handleReadByTimestampIterator(RocksIterator 
seekIterator, RowId rowId, HybridTimestamp timestamp,
-            ByteBuffer keyBuf) {
+    private ReadResult handleReadByTimestampIterator(
+            RocksIterator seekIterator,
+            RowId rowId,
+            HybridTimestamp timestamp,
+            byte[] committedDataIdKey
+    ) {
         // There's no guarantee that required key even exists. If it doesn't, 
then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key 
matches what we need.
         // Here we prepare direct buffer to read key without timestamp. Shared 
direct buffer is used to avoid extra memory allocations.
-        ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().clear();
+        ByteBuffer foundKeyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
         int keyLength = 0;
 
@@ -783,7 +829,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             // There is no record older than timestamp.
             // There might be a write-intent which we should return.
             // Seek to *just* row id.
-            seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            seekIterator.seek(copyOf(committedDataIdKey, ROW_PREFIX_SIZE));
 
             if (invalid(seekIterator)) {
                 // There are no writes with row id.
@@ -866,14 +912,15 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * Checks if row id matches the one written in the key buffer. Note: this 
operation changes the position in the buffer.
      *
      * @param rowId Row id.
-     * @param keyBuf Key buffer.
+     * @param dataIdKey Key buffer.
      * @return {@code true} if row id matches the key buffer, {@code false} 
otherwise.
      */
-    private static boolean matches(RowId rowId, ByteBuffer keyBuf) {
+    private static boolean matches(RowId rowId, ByteBuffer dataIdKey) {
         // Comparison starts from the position of the row id.
-        keyBuf.position(ROW_ID_OFFSET);
+        dataIdKey.position(ROW_ID_OFFSET);
 
-        return rowId.mostSignificantBits() == normalize(keyBuf.getLong()) && 
rowId.leastSignificantBits() == normalize(keyBuf.getLong());
+        return rowId.mostSignificantBits() == normalize(dataIdKey.getLong())
+                && rowId.leastSignificantBits() == 
normalize(dataIdKey.getLong());
     }
 
     @Override
@@ -883,7 +930,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            ByteBuffer prefix = prepareDirectKeyBuf(rowId)
+            ByteBuffer prefix = prepareDirectDataIdKeyBuf(rowId)
                     .position(0)
                     .limit(ROW_PREFIX_SIZE);
 
@@ -891,7 +938,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             RocksIterator it = db.newIterator(helper.partCf, options);
 
-            it = helper.wrapIterator(it, helper.partCf);
+            it = wrapIterator(it, helper.partCf);
 
             it.seek(prefix);
 
@@ -970,7 +1017,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            ByteBuffer keyBuf = prepareDirectKeyBuf(lowerBound)
+            ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
                     .position(0)
                     .limit(ROW_PREFIX_SIZE);
 
@@ -1029,7 +1076,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return null;
             }
 
-            AbstractWriteBatch writeBatch = 
PartitionDataHelper.requireWriteBatch();
+            AbstractWriteBatch writeBatch = requireWriteBatch();
 
             try {
                 byte[] leaseBytes = new byte[Long.BYTES];
@@ -1061,24 +1108,24 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         writeBatch.delete(meta, leaseKey);
 
         writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
+        writeBatch.deleteRange(helper.dataCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
 
         gc.deleteQueue(writeBatch);
     }
 
     @Override
     public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
-        //noinspection resource
-        PartitionDataHelper.requireWriteBatch();
+        WriteBatchWithIndex batch = requireWriteBatch();
 
         // No busy lock required, we're already in "runConsistently" closure.
         throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        return gc.peek(lowWatermark);
+        return gc.peek(batch, lowWatermark);
     }
 
     @Override
     public @Nullable BinaryRow vacuum(GcEntry entry) {
-        WriteBatchWithIndex batch = PartitionDataHelper.requireWriteBatch();
+        WriteBatchWithIndex batch = requireWriteBatch();
 
         // No busy lock required, we're already in "runConsistently" closure.
         throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
@@ -1118,26 +1165,31 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         transitionToDestroyedOrClosedState(StorageState.DESTROYED);
     }
 
-    /**
-     * Prepares thread-local on-heap byte buffer. Writes row id in it. 
Partition id is already there. Timestamp is not cleared.
-     */
-    private ByteBuffer prepareHeapKeyBuf(RowId rowId) {
-        ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().clear();
+    private byte[] createUncommittedDataIdKey(RowId rowId) {
+        ByteBuffer uncommittedDataIdKeyBuf = 
HEAP_DATA_ID_KEY_BUFFER.get().clear();
 
-        writeKey(keyBuf, rowId);
+        writeRowPrefix(uncommittedDataIdKeyBuf, rowId);
 
-        return keyBuf;
+        return uncommittedDataIdKeyBuf.array();
+    }
+
+    private byte[] createCommittedDataIdKey(RowId rowId, HybridTimestamp 
timestamp) {
+        ByteBuffer keyBuf = HEAP_COMMITTED_DATA_ID_KEY_BUFFER.get().clear();
+
+        helper.putCommittedDataIdKey(keyBuf, rowId, timestamp);
+
+        return keyBuf.array();
     }
 
-    private ByteBuffer prepareDirectKeyBuf(RowId rowId) {
-        ByteBuffer keyBuf = DIRECT_KEY_BUFFER.get().clear();
+    private ByteBuffer prepareDirectDataIdKeyBuf(RowId rowId) {
+        ByteBuffer keyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear();
 
-        writeKey(keyBuf, rowId);
+        writeRowPrefix(keyBuf, rowId);
 
         return keyBuf;
     }
 
-    private void writeKey(ByteBuffer buffer, RowId rowId) {
+    private void writeRowPrefix(ByteBuffer buffer, RowId rowId) {
         assert buffer.order() == KEY_BYTE_ORDER;
         assert rowId.partitionId() == partitionId : rowId;
 
@@ -1147,11 +1199,13 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         helper.putRowId(buffer, rowId);
     }
 
-    private static void validateTxId(ByteBuffer valueBytes, UUID txId) {
-        long msb = valueBytes.getLong();
-        long lsb = valueBytes.getLong();
+    private static void validateTxId(ByteBuffer dataIdWithTxState, UUID txId) {
+        dataIdWithTxState.position(DATA_ID_SIZE);
+
+        long msb = dataIdWithTxState.getLong();
+        long lsb = dataIdWithTxState.getLong();
 
-        valueBytes.rewind();
+        dataIdWithTxState.rewind();
 
         if (txId.getMostSignificantBits() != msb || 
txId.getLeastSignificantBits() != lsb) {
             throw new TxIdMismatchException(txId, new UUID(msb, lsb));
@@ -1177,48 +1231,29 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return invalid;
     }
 
-    /**
-     * Converts raw byte array representation of the value into a binary row.
-     *
-     * @param valueBytes Value bytes as read from the storage.
-     * @param valueHasTxData Whether the value has a transaction id prefix in 
it.
-     * @return Binary row instance or {@code null} if value is a tombstone.
-     */
-    private static @Nullable BinaryRow wrapValueIntoBinaryRow(ByteBuffer 
valueBytes, boolean valueHasTxData) {
-        if (isTombstone(valueBytes, valueHasTxData)) {
-            return null;
-        }
-
-        assert valueBytes.order() == ByteOrder.BIG_ENDIAN;
-
-        if (valueHasTxData) {
-            valueBytes.position(VALUE_OFFSET);
-        }
-
-        return deserializeRow(valueBytes);
-    }
-
     /**
      * Converts raw byte array representation of the write-intent value into a 
read result adding newest commit timestamp if it is not
      * {@code null}.
      *
      * @param rowId ID of the corresponding row.
-     * @param valueBuffer Value bytes as read from the storage.
+     * @param transactionState Transaction state, including the Data ID.
      * @param newestCommitTs Commit timestamp of the most recent committed 
write of this value.
      * @return Read result instance.
      */
-    private static ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer 
valueBuffer, @Nullable HybridTimestamp newestCommitTs) {
-        assert valueBuffer.order() == ByteOrder.BIG_ENDIAN;
+    private ReadResult wrapUncommittedValue(RowId rowId, ByteBuffer 
transactionState, @Nullable HybridTimestamp newestCommitTs) {
+        assert transactionState.order() == KEY_BYTE_ORDER;
 
-        UUID txId = new UUID(valueBuffer.getLong(), valueBuffer.getLong());
+        ByteBuffer dataId = readDataIdFromTxState(transactionState);
 
-        int commitTableId = valueBuffer.getInt();
+        UUID txId = new UUID(transactionState.getLong(), 
transactionState.getLong());
 
-        int commitPartitionId = Short.toUnsignedInt(valueBuffer.getShort());
+        int commitTableId = transactionState.getInt();
 
-        BinaryRow row = valueBuffer.remaining() == 0 ? null : 
deserializeRow(valueBuffer);
+        int commitPartitionId = 
Short.toUnsignedInt(transactionState.getShort());
 
-        valueBuffer.rewind();
+        transactionState.rewind();
+
+        BinaryRow row = readRowByDataId(dataId);
 
         return ReadResult.createFromWriteIntent(rowId, row, txId, 
commitTableId, commitPartitionId, newestCommitTs);
     }
@@ -1227,27 +1262,29 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * Converts raw byte array representation of the value into a read result.
      *
      * @param rowId ID of the corresponding row.
-     * @param valueBytes Value bytes as read from the storage.
+     * @param dataId Data ID pointing to row data.
      * @param rowCommitTimestamp Timestamp with which the row was committed.
      * @return Read result instance or {@code null} if value is a tombstone.
      */
-    private static ReadResult wrapCommittedValue(RowId rowId, ByteBuffer 
valueBytes, HybridTimestamp rowCommitTimestamp) {
-        if (valueBytes.remaining() == 0) {
-            return ReadResult.empty(rowId);
+    private ReadResult wrapCommittedValue(RowId rowId, ByteBuffer dataId, 
HybridTimestamp rowCommitTimestamp) {
+        return ReadResult.createFromCommitted(rowId, readRowByDataId(dataId), 
rowCommitTimestamp);
+    }
+
+    @Nullable
+    private BinaryRow readRowByDataId(ByteBuffer dataId) {
+        if (isTombstone(dataId)) {
+            return null;
         }
 
-        return ReadResult.createFromCommitted(
-                rowId,
-                deserializeRow(valueBytes),
-                rowCommitTimestamp
-        );
-    }
+        try {
+            byte[] payloadKey = helper.createPayloadKey(dataId);
 
-    /**
-     * Returns {@code true} if value payload represents a tombstone.
-     */
-    private static boolean isTombstone(ByteBuffer valueBytes, boolean hasTxId) 
{
-        return valueBytes.limit() == (hasTxId ? VALUE_HEADER_SIZE : 0);
+            byte[] rowBytes = getFromBatchAndDb(db, helper.dataCf, readOpts, 
payloadKey);
+
+            return rowBytes == null ? null : deserializeRow(rowBytes);
+        } catch (RocksDBException e) {
+            throw new StorageException(e);
+        }
     }
 
     private abstract class BasePartitionTimestampCursor implements 
PartitionTimestampCursor {
@@ -1293,7 +1330,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                 it.seek(seekKeyBuf.array());
 
-                ReadResult readResult = handleReadByTimestampIterator(it, 
currentRowId, timestamp, seekKeyBuf);
+                ReadResult readResult = handleReadByTimestampIterator(it, 
currentRowId, timestamp, seekKeyBuf.array());
 
                 if (readResult.isEmpty()) {
                     return null;
@@ -1343,7 +1380,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer currentKeyBuffer = DIRECT_KEY_BUFFER.get();
+            ByteBuffer currentKeyBuffer = DIRECT_DATA_ID_KEY_BUFFER.get();
 
             while (true) {
                 // At this point, seekKeyBuf should contain row id that's 
above the one we already scanned, but not greater than any
@@ -1455,7 +1492,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = DIRECT_KEY_BUFFER.get();
+            ByteBuffer directBuffer = DIRECT_DATA_ID_KEY_BUFFER.get();
 
             while (true) {
                 // TODO IGNITE-18201 Remove copying.
@@ -1475,7 +1512,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 // Seek to current row id + timestamp.
                 it.seek(seekKeyBuf.array());
 
-                ReadResult readResult = handleReadByTimestampIterator(it, 
rowId, timestamp, seekKeyBuf);
+                ReadResult readResult = handleReadByTimestampIterator(it, 
rowId, timestamp, seekKeyBuf.array());
 
                 if (readResult.isEmpty() && !readResult.isWriteIntent()) {
                     // Seek to next row id as we found nothing that matches.
@@ -1580,6 +1617,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         writeBatch.delete(meta, estimatedSizeKey);
 
         writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
+        writeBatch.deleteRange(helper.dataCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
 
         gc.deleteQueue(writeBatch);
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 65c368dbbd..53e5d65163 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -136,6 +136,13 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return rocksDb.gcQueueCf.handle();
     }
 
+    /**
+     * Returns a column family handle for data CF.
+     */
+    ColumnFamilyHandle dataCfHandle() {
+        return rocksDb.dataCf.handle();
+    }
+
     /**
      * Returns a future to wait next flush operation from the current point in 
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
      * achieve this.
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 4771a339a7..35f65660e6 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -113,12 +113,15 @@ public final class SharedRocksDbInstance {
     /** Meta information instance that wraps {@link ColumnFamily} instance for 
meta column family. */
     public final RocksDbMetaStorage meta;
 
-    /** Column Family for partition data. */
+    /** Column Family for TX data and references to {@link #dataCf}. */
     public final ColumnFamily partitionCf;
 
     /** Column Family for GC queue. */
     public final ColumnFamily gcQueueCf;
 
+    /** Column Family for storing binary rows. */
+    public final ColumnFamily dataCf;
+
     /** Column Family for Hash Index data. */
     private final ColumnFamily hashIndexCf;
 
@@ -143,6 +146,7 @@ public final class SharedRocksDbInstance {
             RocksDbMetaStorage meta,
             ColumnFamily partitionCf,
             ColumnFamily gcQueueCf,
+            ColumnFamily dataCf,
             ColumnFamily hashIndexCf,
             List<ColumnFamily> sortedIndexCfs,
             List<AutoCloseable> resources
@@ -157,6 +161,7 @@ public final class SharedRocksDbInstance {
         this.meta = meta;
         this.partitionCf = partitionCf;
         this.gcQueueCf = gcQueueCf;
+        this.dataCf = dataCf;
         this.hashIndexCf = hashIndexCf;
 
         this.resources = new ArrayList<>(resources);
@@ -363,6 +368,7 @@ public final class SharedRocksDbInstance {
                     .array();
 
             deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
+            deleteByPrefix(writeBatch, dataCf, tableIdBytes);
             deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
             deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
index 29b898d814..58b46138a9 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
@@ -100,6 +100,7 @@ public class SharedRocksDbInstanceCreator {
             RocksDbMetaStorage meta = null;
             ColumnFamily partitionCf = null;
             ColumnFamily gcQueueCf = null;
+            ColumnFamily dataCf = null;
             ColumnFamily hashIndexCf = null;
             var sortedIndexCfs = new ArrayList<ColumnFamily>();
 
@@ -123,6 +124,11 @@ public class SharedRocksDbInstanceCreator {
 
                         break;
 
+                    case DATA:
+                        dataCf = cf;
+
+                        break;
+
                     case HASH_INDEX:
                         hashIndexCf = cf;
 
@@ -150,6 +156,7 @@ public class SharedRocksDbInstanceCreator {
                     requireNonNull(partitionCf, "partitionCf"),
                     requireNonNull(gcQueueCf, "gcQueueCf"),
                     requireNonNull(hashIndexCf, "hashIndexCf"),
+                    requireNonNull(dataCf, "dataCf"),
                     sortedIndexCfs,
                     resources // Trusts the inner class to copy the resources!!
             );
@@ -198,6 +205,7 @@ public class SharedRocksDbInstanceCreator {
         switch (ColumnFamilyType.fromCfName(utf8cfName)) {
             case META:
             case GC_QUEUE:
+            case DATA:
                 return add(new ColumnFamilyOptions());
 
             case PARTITION:
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
new file mode 100644
index 0000000000..e8540c26d3
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelperTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.setFirstBit;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.junit.jupiter.api.Test;
+
+class PartitionDataHelperTest {
+    @Test
+    void testSetFirstBit() {
+        byte[] array = {1, 0, 42};
+
+        setFirstBit(array, 1, true);
+
+        assertThat(array, is(new byte[] {1, 1, 42}));
+
+        setFirstBit(array, 1, true);
+
+        assertThat(array, is(new byte[] {1, 1, 42}));
+
+        setFirstBit(array, 1, false);
+
+        assertThat(array, is(new byte[] {1, 0, 42}));
+
+        setFirstBit(array, 1, false);
+
+        assertThat(array, is(new byte[] {1, 0, 42}));
+
+        setFirstBit(array, 2, true);
+
+        assertThat(array, is(new byte[] {1, 0, 43}));
+
+        setFirstBit(array, 2, true);
+
+        assertThat(array, is(new byte[] {1, 0, 43}));
+
+        setFirstBit(array, 2, false);
+
+        assertThat(array, is(new byte[] {1, 0, 42}));
+
+        setFirstBit(array, 2, false);
+
+        assertThat(array, is(new byte[] {1, 0, 42}));
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
new file mode 100644
index 0000000000..b6dc18dfbd
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/benchmarks/CommitManyWritesBenchmark.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.benchmarks;
+
+import static org.apache.ignite.internal.util.IgniteUtils.capacity;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbTableStorage;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbProfileView;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark for measuring the performance of {@link 
RocksDbMvPartitionStorage} in a scenario, when many entries are added in one big
+ * transaction and are then committed.
+ */
+@Fork(1)
+@Threads(5)
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.SingleShotTime)
+public class CommitManyWritesBenchmark {
+    private static final String STORAGE_PROFILE_NAME = "test";
+    private static final int TABLE_ID = 1;
+    private static final int NUM_PARTITIONS = 50;
+    private static final int NUM_ROWS = 100_000;
+    private static final int ROW_LENGTH = 10_000;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private RocksDbStorageEngine storageEngine;
+
+    private RocksDbTableStorage tableStorage;
+
+    /** Setup method. */
+    @Setup
+    public void setUp() throws IOException {
+        Path workDir = 
Files.createTempDirectory(CommitManyWritesBenchmark.class.getSimpleName());
+
+        storageEngine = new RocksDbStorageEngine(
+                "test",
+                engineConfiguration(),
+                storageConfiguration(),
+                workDir,
+                () -> {}
+        );
+
+        storageEngine.start();
+
+        var tableDescriptor = new StorageTableDescriptor(TABLE_ID, 
NUM_PARTITIONS, STORAGE_PROFILE_NAME);
+
+        tableStorage = storageEngine.createMvTable(tableDescriptor, indexId -> 
null);
+
+        CompletableFuture<?>[] createFutures = IntStream.range(0, 
NUM_PARTITIONS)
+                .mapToObj(tableStorage::createMvPartition)
+                .toArray(CompletableFuture[]::new);
+
+        CompletableFuture.allOf(createFutures).join();
+    }
+
+    /** Tear down method. */
+    @TearDown
+    public void tearDown() {
+        tableStorage.destroy().join();
+
+        storageEngine.stop();
+    }
+
+    private static int randomPartitionId() {
+        return ThreadLocalRandom.current().nextInt(NUM_PARTITIONS);
+    }
+
+    private static Map<RowId, BinaryRow> randomRows(int partitionId) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        var rows = new HashMap<RowId, BinaryRow>(capacity(NUM_ROWS));
+
+        for (int i = 0; i < NUM_ROWS; i++) {
+            var rowId = new RowId(partitionId);
+
+            BinaryRow row = randomRow(random);
+
+            rows.put(rowId, row);
+        }
+
+        return rows;
+    }
+
+    private static BinaryRow randomRow(ThreadLocalRandom random) {
+        ByteBuffer buffer = ByteBuffer.allocate(ROW_LENGTH);
+
+        random.nextBytes(buffer.array());
+
+        return new BinaryRowImpl(0, buffer);
+    }
+
+    /** Randomly generated rows for a partition. */
+    @State(Scope.Thread)
+    public static class DataToAddAndCommit {
+        final int partitionId = randomPartitionId();
+
+        final Map<RowId, BinaryRow> rows = randomRows(partitionId);
+    }
+
+    /** Randomly generated rows for a partition which has also been written to 
the storage. */
+    @State(Scope.Thread)
+    public static class DataToCommit {
+        final int partitionId = randomPartitionId();
+
+        final Map<RowId, BinaryRow> rows = randomRows(partitionId);
+
+        /** Setup method. */
+        @Setup
+        public void setUp(CommitManyWritesBenchmark benchmark) {
+            UUID txId = TransactionIds.transactionId(benchmark.clock.now(), 0);
+
+            MvPartitionStorage partitionStorage = 
benchmark.tableStorage.getMvPartition(partitionId);
+
+            partitionStorage.runConsistently(locker -> {
+                rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId, 
row, txId, TABLE_ID, partitionId));
+
+                return null;
+            });
+        }
+    }
+
+    private static RocksDbStorageEngineConfiguration engineConfiguration() {
+        RocksDbStorageEngineConfiguration config = 
mock(RocksDbStorageEngineConfiguration.class);
+
+        ConfigurationValue<Integer> flushDelayMillis = 
mock(ConfigurationValue.class);
+
+        when(flushDelayMillis.value()).thenReturn(100);
+
+        when(config.flushDelayMillis()).thenReturn(flushDelayMillis);
+
+        return config;
+    }
+
+    private static StorageConfiguration storageConfiguration() {
+        StorageConfiguration config = mock(StorageConfiguration.class);
+
+        NamedConfigurationTree profilesTree = 
mock(NamedConfigurationTree.class);
+        NamedListView profilesView = mock(NamedListView.class);
+        RocksDbProfileView rocksDbProfileView = mock(RocksDbProfileView.class);
+
+        when(rocksDbProfileView.name()).thenReturn(STORAGE_PROFILE_NAME);
+        when(rocksDbProfileView.size()).thenReturn(16777216L);
+        when(rocksDbProfileView.writeBufferSize()).thenReturn(16777216L);
+
+        when(config.profiles()).thenReturn(profilesTree);
+        when(profilesTree.value()).thenReturn(profilesView);
+        
when(profilesView.iterator()).thenReturn(List.of(rocksDbProfileView).iterator());
+
+        return config;
+    }
+
+    /** Benchmark for the combination of {@link MvPartitionStorage#addWrite} 
and {@link MvPartitionStorage#commitWrite} methods. */
+    @Benchmark
+    public void addAndCommitManyWrites(DataToAddAndCommit data) {
+        MvPartitionStorage partitionStorage = 
tableStorage.getMvPartition(data.partitionId);
+
+        UUID txId = TransactionIds.transactionId(clock.now(), 0);
+
+        partitionStorage.runConsistently(locker -> {
+            data.rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId, 
row, txId, TABLE_ID, data.partitionId));
+
+            return null;
+        });
+
+        HybridTimestamp commitTs = clock.now();
+
+        partitionStorage.runConsistently(locker -> {
+            data.rows.keySet().forEach(rowId -> 
partitionStorage.commitWrite(rowId, commitTs));
+
+            return null;
+        });
+    }
+
+    /** Benchmark for calling {@link MvPartitionStorage#addWrite} many times. 
*/
+    @Benchmark
+    public void addManyWrites(DataToAddAndCommit data) {
+        MvPartitionStorage partitionStorage = 
tableStorage.getMvPartition(data.partitionId);
+
+        UUID txId = TransactionIds.transactionId(clock.now(), 0);
+
+        partitionStorage.runConsistently(locker -> {
+            data.rows.forEach((rowId, row) -> partitionStorage.addWrite(rowId, 
row, txId, TABLE_ID, data.partitionId));
+
+            return null;
+        });
+    }
+
+    /** Benchmark for calling {@link MvPartitionStorage#commitWrite} many 
times. */
+    @Benchmark
+    public void commitManyWrites(DataToCommit data) {
+        MvPartitionStorage partitionStorage = 
tableStorage.getMvPartition(data.partitionId);
+
+        HybridTimestamp commitTs = clock.now();
+
+        partitionStorage.runConsistently(locker -> {
+            data.rows.keySet().forEach(rowId -> 
partitionStorage.commitWrite(rowId, commitTs));
+
+            return null;
+        });
+    }
+
+    /** Main method. */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(CommitManyWritesBenchmark.class.getSimpleName())
+                .build();
+
+        new Runner(opt).run();
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index a4f8741a3a..8e68d23da8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.verify;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.stream.IntStream;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -280,11 +279,11 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
         );
     }
 
-    private static void addWriteCommitted(PartitionDataStorage storage, RowId 
rowId, @Nullable BinaryRow row, HybridTimestamp timestamp) {
+    private void addWriteCommitted(PartitionDataStorage storage, RowId rowId, 
@Nullable BinaryRow row, HybridTimestamp timestamp) {
         storage.runConsistently(locker -> {
             locker.lock(rowId);
 
-            storage.addWrite(rowId, row, UUID.randomUUID(), 999, PARTITION_ID);
+            storage.addWrite(rowId, row, newTransactionId(), 999, 
PARTITION_ID);
 
             storage.commitWrite(rowId, timestamp);
 


Reply via email to