This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e5156a8caa IGNITE-17611 Local storage recovery code for 
TxStateRocksDbStorage (#1061)
e5156a8caa is described below

commit e5156a8caa7b680ffb94d3d32f0765410a9d02e2
Author: ibessonov <[email protected]>
AuthorDate: Fri Sep 9 14:58:54 2022 +0300

    IGNITE-17611 Local storage recovery code for TxStateRocksDbStorage (#1061)
---
 .../org/apache/ignite/internal/util/ByteUtils.java |  10 +-
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   9 +-
 .../internal/rocksdb/BusyRocksIteratorAdapter.java |  91 +++++++
 .../internal/rocksdb/RocksIteratorAdapter.java     |   2 +-
 .../rocksdb/flush}/RocksDbFlushListener.java       |  63 ++---
 .../internal/rocksdb/flush/RocksDbFlusher.java     | 238 +++++++++++++++++
 .../storage/rocksdb/RocksDbTableStorage.java       | 150 +++--------
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   2 +-
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../distributed/raft/snapshot/PartitionAccess.java |  28 ++
 .../raft/snapshot/PartitionSnapshotStorage.java    |  10 +-
 .../snapshot/PartitionSnapshotStorageFactory.java  |  14 +-
 .../internal/tx/storage/state/TxStateStorage.java  |  39 +--
 .../state/rocksdb/TxStateRocksDbStorage.java       | 294 +++++++++++++++++----
 .../tx/storage/state/TxStateStorageTest.java       |  72 +----
 15 files changed, 708 insertions(+), 317 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 180aca3a5d..8d921a2b04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -38,18 +38,12 @@ public class ByteUtils {
     public static long bytesToLong(byte[] bytes, int off) {
         assert bytes != null;
 
-        int bytesCnt = Long.BYTES;
-
-        if (off + bytesCnt > bytes.length) {
-            bytesCnt = bytes.length - off;
-        }
+        int bytesCnt = Math.min(Long.BYTES, bytes.length - off);
 
         long res = 0;
 
         for (int i = 0; i < bytesCnt; i++) {
-            int shift = bytesCnt - i - 1 << 3;
-
-            res |= (0xffL & bytes[off++]) << shift;
+            res = (res << 8) | (0xffL & bytes[off + i]);
         }
 
         return res;
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 08d71125cc..068e0e94b6 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -176,12 +176,15 @@ public class ErrorGroups {
         public static final ErrorGroup TX_ERR_GROUP = 
ErrorGroup.newGroup("TX", 3);
 
         /** Error on creation of tx state storage. */
-        public static int TX_STATE_STORAGE_CREATE_ERR = 
TX_ERR_GROUP.registerErrorCode(1);
+        public static final int TX_STATE_STORAGE_CREATE_ERR = 
TX_ERR_GROUP.registerErrorCode(1);
 
         /** Error on destruction of tx state storage. */
-        public static int TX_STATE_STORAGE_DESTROY_ERR = 
TX_ERR_GROUP.registerErrorCode(2);
+        public static final int TX_STATE_STORAGE_DESTROY_ERR = 
TX_ERR_GROUP.registerErrorCode(2);
 
         /** Error of tx state storage. */
-        public static int TX_STATE_STORAGE_ERR = 
TX_ERR_GROUP.registerErrorCode(3);
+        public static final int TX_STATE_STORAGE_ERR = 
TX_ERR_GROUP.registerErrorCode(3);
+
+        /** Tx state storage is stopped. */
+        public static final int TX_STATE_STORAGE_STOPPED_ERR = 
TX_ERR_GROUP.registerErrorCode(4);
     }
 }
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
new file mode 100644
index 0000000000..c74655ba8b
--- /dev/null
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Adapter from a {@link RocksIterator} to a {@link Cursor} that also handles 
stopping of the storage.
+ */
+public abstract class BusyRocksIteratorAdapter<T> extends 
RocksIteratorAdapter<T> {
+    /** Busy lock. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param it RocksDB iterator.
+     */
+    protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock, 
RocksIterator it) {
+        super(it);
+        this.busyLock = busyLock;
+    }
+
+    /**
+     * Handles busy lock acquiring failure. This means that db has been 
stopped and cursor can't proceed. Must throw an exception.
+     */
+    protected abstract void handleBusy();
+
+    private void handleBusy0() {
+        handleBusy();
+
+        assert false : "handleBusy() should have thrown an exception.";
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!busyLock.enterBusy()) {
+            handleBusy0();
+        }
+
+        try {
+            return super.hasNext();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Override
+    public T next() {
+        if (!busyLock.enterBusy()) {
+            handleBusy0();
+        }
+
+        try {
+            return super.next();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!busyLock.enterBusy()) {
+            handleBusy0();
+        }
+
+        try {
+            super.close();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+}
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
index 5d50e29ec2..ea902ecebb 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
@@ -52,7 +52,7 @@ public abstract class RocksIteratorAdapter<T> implements 
Cursor<T> {
 
         if (!isValid) {
             // check the status first. This operation is guaranteed to throw 
if an internal error has occurred during
-            // the iteration. Otherwise we've exhausted the data range.
+            // the iteration. Otherwise, we've exhausted the data range.
             RocksUtils.checkIterator(it);
         }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
similarity index 53%
rename from 
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
rename to 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
index 69d13cad81..1fd6ada7a7 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.rocksdb;
+package org.apache.ignite.internal.rocksdb.flush;
 
 import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BEGIN;
 import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
@@ -23,26 +23,16 @@ import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_CO
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.configuration.schemas.table.TableView;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.storage.StorageException;
 import org.rocksdb.AbstractEventListener;
 import org.rocksdb.FlushJobInfo;
 import org.rocksdb.RocksDB;
 
 /**
- * Represents a listener of RocksDB flush events. Responsible for updating 
persisted index of partitions.
- *
- * @see RocksDbMvPartitionStorage#persistedIndex()
- * @see RocksDbMvPartitionStorage#refreshPersistedIndex()
+ * Represents a listener of RocksDB flush events.
  */
 class RocksDbFlushListener extends AbstractEventListener {
-    /** Logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(RocksDbFlushListener.class);
-
-    /** Table storage. */
-    private final RocksDbTableStorage tableStorage;
+    /** Flusher instance. */
+    private final RocksDbFlusher flusher;
 
     /**
      * Type of last processed event. Real amount of events doesn't matter in 
atomic flush mode. All "completed" events go after all "begin"
@@ -55,9 +45,15 @@ class RocksDbFlushListener extends AbstractEventListener {
      */
     private volatile CompletableFuture<?> lastFlushProcessed = 
CompletableFuture.completedFuture(null);
 
-    public RocksDbFlushListener(RocksDbTableStorage tableStorage) {
+    /**
+     * Constructor.
+     *
+     * @param flusher Flusher instance to delegate events processing to.
+     */
+    RocksDbFlushListener(RocksDbFlusher flusher) {
         super(EnabledEventCallback.ON_FLUSH_BEGIN, 
EnabledEventCallback.ON_FLUSH_COMPLETED);
-        this.tableStorage = tableStorage;
+
+        this.flusher = flusher;
     }
 
     /** {@inheritDoc} */
@@ -71,42 +67,13 @@ class RocksDbFlushListener extends AbstractEventListener {
     /** {@inheritDoc} */
     @Override
     public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
-        ExecutorService threadPool = tableStorage.engine().threadPool();
+        ExecutorService threadPool = flusher.threadPool;
 
         if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
-            lastFlushProcessed = 
CompletableFuture.runAsync(this::refreshPersistedIndexes, threadPool);
+            lastFlushProcessed = 
CompletableFuture.runAsync(flusher.onFlushCompleted, threadPool);
         }
 
         // Do it for every column family, there's no way to tell in advance 
which one has the latest sequence number.
-        lastFlushProcessed.whenCompleteAsync((o, throwable) -> 
tableStorage.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
-    }
-
-    private void refreshPersistedIndexes() {
-        if (!tableStorage.busyLock.enterBusy()) {
-            return;
-        }
-
-        try {
-            TableView tableCfgView = tableStorage.configuration().value();
-
-            for (int partitionId = 0; partitionId < tableCfgView.partitions(); 
partitionId++) {
-                RocksDbMvPartitionStorage partition = 
tableStorage.getMvPartition(partitionId);
-
-                if (partition != null) {
-                    try {
-                        partition.refreshPersistedIndex();
-                    } catch (StorageException storageException) {
-                        LOG.error(
-                                "Filed to refresh persisted applied index 
value for table {} partition {}",
-                                storageException,
-                                tableStorage.configuration().name().value(),
-                                partitionId
-                        );
-                    }
-                }
-            }
-        } finally {
-            tableStorage.busyLock.leaveBusy();
-        }
+        lastFlushProcessed.whenCompleteAsync((o, throwable) -> 
flusher.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
     }
 }
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
new file mode 100644
index 0000000000..31a381568c
--- /dev/null
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.flush;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.rocksdb.AbstractEventListener;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Helper class to deal with RocksDB flushes. Provides an ability to wait 
until current state of data is flushed to the storage.
+ * Requires enabled {@link Options#setAtomicFlush(boolean)} option to work 
properly.
+ */
+public class RocksDbFlusher {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(RocksDbFlusher.class);
+
+    /** Rocks DB instance. */
+    private volatile RocksDB db;
+
+    /** List of all column families. */
+    private volatile List<ColumnFamilyHandle> columnFamilyHandles;
+
+    /** Scheduled pool to schedule flushes. */
+    private final ScheduledExecutorService scheduledPool;
+
+    /** Thread pool to complete flush completion futures. */
+    final ExecutorService threadPool;
+
+    /** Supplier of delay values to batch independent flush requests. */
+    private final IntSupplier delaySupplier;
+
+    /** Flush completion callback. */
+    final Runnable onFlushCompleted;
+
+    /**
+     * Flush options to be used to asynchronously flush the Rocks DB memtable. 
It needs to be cached, because
+     * {@link RocksDB#flush(FlushOptions)} contract requires this object to 
not be GC-ed.
+     */
+    private final FlushOptions flushOptions = new 
FlushOptions().setWaitForFlush(false);
+
+    /** Map with flush futures by sequence number at the time of the {@link 
#awaitFlush(boolean)} call. */
+    final SortedMap<Long, CompletableFuture<Void>> 
flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
+
+    /** Latest known sequence number for persisted data. Not volatile, 
protected by explicit synchronization. */
+    private long latestPersistedSequenceNumber;
+
+    /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
+    private final Object latestPersistedSequenceNumberMux = new Object();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /**
+     * Instance of the latest scheduled flush closure.
+     *
+     * @see #scheduleFlush()
+     */
+    private volatile Runnable latestFlushClosure;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param scheduledPool Scheduled pool the schedule flushes.
+     * @param threadPool Thread pool to run flush completion closure, provided 
by {@code onFlushCompleted} parameter.
+     * @param delaySupplier Supplier of delay values to batch independent 
flush requests. When {@link #awaitFlush(boolean)} is called with
+     *      {@code true} parameter, the flusher waits given number of 
milliseconds (using {@code scheduledPool}) and then executes flush
+     *      only if there were no other {@code awaitFlush(true)} calls. 
Otherwise, it does nothing after the timeout. This guarantees that
+     *      either the last one wins, or automatic flush wins if there's an 
enlless stream of {@code awaitFlush(true)} calls with very small
+     *      time-intervals between them. Such behavior allows to save on 
unnecessary flushes when multiple await flush calls appear at
+     *      roughly the same time from different threads. For example, several 
partitions might be flushed at the same time, because they
+     *      started at the same time and their flush frequency is also the 
same.
+     * @param onFlushCompleted Flush completion callback. Executed on every 
individual column family flush.
+     */
+    public RocksDbFlusher(
+            IgniteSpinBusyLock busyLock,
+            ScheduledExecutorService scheduledPool,
+            ExecutorService threadPool,
+            IntSupplier delaySupplier,
+            Runnable onFlushCompleted
+    ) {
+        this.busyLock = busyLock;
+        this.scheduledPool = scheduledPool;
+        this.threadPool = threadPool;
+        this.delaySupplier = delaySupplier;
+        this.onFlushCompleted = onFlushCompleted;
+    }
+
+    /**
+     * Returns an instance of {@link AbstractEventListener} to process actual 
RocksDB events. Returned listener must be set into
+     * {@link Options#setListeners(List)} before database is started. 
Otherwise, no events would occurre.
+     */
+    public AbstractEventListener listener() {
+        return new RocksDbFlushListener(this);
+    }
+
+    /**
+     * Initializes the flusher with DB instance and a list of column families.
+     *
+     * @param db Rocks DB instance.
+     * @param columnFamilyHandles List of all column families. Column families 
missing from this list may not have flush events processed.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public void init(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles) 
{
+        this.db = db;
+        this.columnFamilyHandles = columnFamilyHandles;
+
+        synchronized (latestPersistedSequenceNumberMux) {
+            latestPersistedSequenceNumber = db.getLatestSequenceNumber();
+        }
+    }
+
+    /**
+     * Returns a future to wait next flush operation from the current point in 
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
+     * achieve this, by fixing its value at the time of invokation. Storage is 
considered flushed when at least one persisted column
+     * family has its latest sequence number greater or equal to the one that 
we fixed. This is enough to guarantee that all column families
+     * have up-to-data state as well, because flusher expects its users to 
also have {@link Options#setAtomicFlush(boolean)} option
+     * enabled.
+     *
+     * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} 
should be explicitly triggerred in the near future. Please refer
+     *      to {@link RocksDbFlusher#RocksDbFlusher(IgniteSpinBusyLock, 
ScheduledExecutorService, ExecutorService, IntSupplier, Runnable)}
+     *      parameters description to see what's really happening in this case.
+     *
+     * @see #scheduleFlush()
+     */
+    public CompletableFuture<Void> awaitFlush(boolean schedule) {
+        CompletableFuture<Void> future;
+
+        long dbSequenceNumber = db.getLatestSequenceNumber();
+
+        synchronized (latestPersistedSequenceNumberMux) {
+            if (dbSequenceNumber <= latestPersistedSequenceNumber) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            future = 
flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, s -> new 
CompletableFuture<>());
+        }
+
+        if (schedule) {
+            scheduleFlush();
+        }
+
+        return future;
+    }
+
+    /**
+     * Schedules a flush of the table. If run several times within a small 
amount of time, only the last scheduled flush will be executed.
+     */
+    private void scheduleFlush() {
+        Runnable newClosure = new Runnable() {
+            @Override
+            public void run() {
+                if (latestFlushClosure != this) {
+                    return;
+                }
+
+                if (!busyLock.enterBusy()) {
+                    return;
+                }
+
+                try {
+                    // Explicit list of CF handles is mandatory!
+                    // Default flush is buggy and only invokes listener 
methods for a single random CF.
+                    db.flush(flushOptions, columnFamilyHandles);
+                } catch (RocksDBException e) {
+                    LOG.error("Error occurred during the explicit flush", e);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        };
+
+        latestFlushClosure = newClosure;
+
+        scheduledPool.schedule(newClosure, delaySupplier.getAsInt(), 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a 
given sequence number.
+     */
+    void completeFutures(long sequenceNumber) {
+        synchronized (latestPersistedSequenceNumberMux) {
+            if (sequenceNumber <= latestPersistedSequenceNumber) {
+                return;
+            }
+
+            latestPersistedSequenceNumber = sequenceNumber;
+        }
+
+        SortedMap<Long, CompletableFuture<Void>> futuresToComplete = 
flushFuturesBySequenceNumber.headMap(sequenceNumber + 1);
+
+        for (CompletableFuture<Void> future : futuresToComplete.values()) {
+            future.complete(null);
+        }
+
+        futuresToComplete.clear();
+    }
+
+    /**
+     * Stops the flusher by cancelling all of its futures.
+     */
+    public void stop() {
+        for (CompletableFuture<Void> future : 
flushFuturesBySequenceNumber.values()) {
+            future.cancel(false);
+        }
+
+        flushOptions.close();
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 5b3d98d323..47f1f7bf23 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -27,22 +27,20 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -84,18 +82,15 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Data region for the table. */
     private final RocksDbDataRegion dataRegion;
 
+    /** RocksDB flusher instance. */
+    private volatile RocksDbFlusher flusher;
+
     /** Rocks DB instance. */
     private volatile RocksDB db;
 
     /** Write options for write operations. */
     private final WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
 
-    /**
-     * Flush options to be used to asynchronously flush the Rocks DB memtable. 
It needs to be cached, because
-     * {@link RocksDB#flush(FlushOptions)} contract requires this object to 
not be GC-ed.
-     */
-    private final FlushOptions flushOptions = new 
FlushOptions().setWaitForFlush(false);
-
     /** Meta information. */
     private volatile RocksDbMetaStorage meta;
 
@@ -105,31 +100,12 @@ public class RocksDbTableStorage implements 
MvTableStorage {
     /** Column Family handle for Hash Index data. */
     private volatile ColumnFamily hashIndexCf;
 
-    /** List of all existing Column Family handles. */
-    private volatile List<ColumnFamilyHandle> allCfHandles;
-
     /** Partition storages. */
     private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> 
partitions;
 
     /** Hash Index storages by Index IDs. */
     private final ConcurrentMap<UUID, HashIndices> hashIndices = new 
ConcurrentHashMap<>();
 
-    /** Map with flush futures by sequence number at the time of the {@link 
#awaitFlush(boolean)} call. */
-    private final ConcurrentMap<Long, CompletableFuture<Void>> 
flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
-
-    /** Latest known sequence number for persisted data. Not volatile, 
protected by explicit synchronization. */
-    private long latestPersistedSequenceNumber;
-
-    /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
-    private final Object latestPersistedSequenceNumberMux = new Object();
-
-    /**
-     * Instance of the latest scheduled flush closure.
-     *
-     * @see #scheduleFlush()
-     */
-    private volatile Runnable latestFlushClosure;
-
     /** Busy lock to stop synchronously. */
     final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -193,6 +169,14 @@ public class RocksDbTableStorage implements MvTableStorage 
{
     /** {@inheritDoc} */
     @Override
     public void start() throws StorageException {
+        flusher = new RocksDbFlusher(
+                busyLock,
+                engine.scheduledPool(),
+                engine().threadPool(),
+                engine.configuration().flushDelayMillis()::value,
+                this::refreshPersistedIndexes
+        );
+
         try {
             Files.createDirectories(tablePath);
         } catch (IOException e) {
@@ -208,7 +192,7 @@ public class RocksDbTableStorage implements MvTableStorage {
                 .setCreateMissingColumnFamilies(true)
                 // Atomic flush must be enabled to guarantee consistency 
between different column families when WAL is disabled.
                 .setAtomicFlush(true)
-                .setListeners(List.of(new RocksDbFlushListener(this)))
+                .setListeners(List.of(flusher.listener()))
                 .setWriteBufferManager(dataRegion.writeBufferManager());
 
         try {
@@ -243,12 +227,7 @@ public class RocksDbTableStorage implements MvTableStorage 
{
             assert partitionCf != null;
             assert hashIndexCf != null;
 
-            allCfHandles = List.copyOf(cfHandles);
-
-            // Pointless synchronization, but without it there would be a 
warning in the code.
-            synchronized (latestPersistedSequenceNumberMux) {
-                latestPersistedSequenceNumber = db.getLatestSequenceNumber();
-            }
+            flusher.init(db, cfHandles);
         } catch (RocksDBException e) {
             throw new StorageException("Failed to initialize RocksDB 
instance", e);
         }
@@ -265,83 +244,40 @@ public class RocksDbTableStorage implements 
MvTableStorage {
      * achieve this.
      *
      * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} 
should be explicitly triggerred in the near future.
-     *
-     * @see #scheduleFlush()
      */
     public CompletableFuture<Void> awaitFlush(boolean schedule) {
-        CompletableFuture<Void> future;
-
-        long dbSequenceNumber = db.getLatestSequenceNumber();
-
-        synchronized (latestPersistedSequenceNumberMux) {
-            if (dbSequenceNumber <= latestPersistedSequenceNumber) {
-                return CompletableFuture.completedFuture(null);
-            }
-
-            future = 
flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, l -> new 
CompletableFuture<>());
-        }
-
-        if (schedule) {
-            scheduleFlush();
-        }
-
-        return future;
+        return flusher.awaitFlush(schedule);
     }
 
-    /**
-     * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a 
given sequence number.
-     */
-    void completeFutures(long sequenceNumber) {
-        synchronized (latestPersistedSequenceNumberMux) {
-            if (sequenceNumber <= latestPersistedSequenceNumber) {
-                return;
-            }
-
-            latestPersistedSequenceNumber = sequenceNumber;
+    private void refreshPersistedIndexes() {
+        if (!busyLock.enterBusy()) {
+            return;
         }
 
-        Set<Entry<Long, CompletableFuture<Void>>> entries = 
flushFuturesBySequenceNumber.entrySet();
-
-        for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = 
entries.iterator(); iterator.hasNext(); ) {
-            Entry<Long, CompletableFuture<Void>> entry = iterator.next();
-
-            if (sequenceNumber >= entry.getKey()) {
-                entry.getValue().complete(null);
-
-                iterator.remove();
-            }
-        }
-    }
+        try {
+            TableView tableCfgView = configuration().value();
 
-    /**
-     * Schedules a flush of the table. If run several times within a small 
amount of time, only the last scheduled flush will be executed.
-     */
-    void scheduleFlush() {
-        Runnable newClosure = new Runnable() {
-            @Override
-            public void run() {
-                if (latestFlushClosure != this) {
-                    return;
-                }
+            for (int partitionId = 0; partitionId < tableCfgView.partitions(); 
partitionId++) {
+                RocksDbMvPartitionStorage partition = 
getMvPartition(partitionId);
 
-                try {
-                    // Explicit list of CF handles is mandatory!
-                    // Default flush is buggy and only invokes listener 
methods for a single random CF.
-                    db.flush(flushOptions, allCfHandles);
-                } catch (RocksDBException e) {
-                    LOG.error("Error occurred during the explicit flush for 
table '{}'", e, tableCfg.name());
+                if (partition != null) {
+                    try {
+                        partition.refreshPersistedIndex();
+                    } catch (StorageException storageException) {
+                        LOG.error(
+                                "Filed to refresh persisted applied index 
value for table {} partition {}",
+                                storageException,
+                                configuration().name().value(),
+                                partitionId
+                        );
+                    }
                 }
             }
-        };
-
-        latestFlushClosure = newClosure;
-
-        int delay = engine.configuration().flushDelayMillis().value();
-
-        engine.scheduledPool().schedule(newClosure, delay, 
TimeUnit.MILLISECONDS);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
-
     /** {@inheritDoc} */
     @Override
     public void stop() throws StorageException {
@@ -351,18 +287,18 @@ public class RocksDbTableStorage implements 
MvTableStorage {
 
         busyLock.block();
 
-        for (CompletableFuture<Void> future : 
flushFuturesBySequenceNumber.values()) {
-            future.cancel(false);
-        }
-
         List<AutoCloseable> resources = new ArrayList<>();
 
+        resources.add(flusher::stop);
+
+        resources.add(meta.columnFamily().handle());
+        resources.add(partitionCf.handle());
+        resources.add(hashIndexCf.handle());
+
         resources.add(db);
 
         resources.add(writeOptions);
 
-        resources.add(flushOptions);
-
         for (int i = 0; i < partitions.length(); i++) {
             MvPartitionStorage partition = partitions.get(i);
 
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index e51591470b..06c64b71a9 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -116,7 +116,7 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
         CompletableFuture<Void> destroyFuture = 
tableStorage.destroyPartition(42);
 
         // Partition destruction doesn't enforce flush.
-        ((RocksDbTableStorage) tableStorage).scheduleFlush();
+        ((RocksDbTableStorage) tableStorage).awaitFlush(true);
 
         assertThat(destroyFuture, willCompleteSuccessfully());
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a6a2722686..c488796387 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -751,7 +751,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         //TODO Revisit peers String representation: 
https://issues.apache.org/jira/browse/IGNITE-17420
         raftGroupOptions.snapshotStorageFactory(new 
PartitionSnapshotStorageFactory(
-                partitionStorage,
+                //TODO IGNITE-17302 Use miniumum from mv storage and tx state 
storage.
+                partitionStorage::persistedIndex,
                 peers.stream().map(n -> new 
Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
                 List.of()
         ));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
new file mode 100644
index 0000000000..48b5074d07
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+/**
+ * Small abstractions for partition storages that includes only methods, 
mandatory for the snapshot storage.
+ */
+public interface PartitionAccess {
+    /**
+     * Returns persisted RAFT index for the partition.
+     */
+    long persistedIndex();
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 61dcfe0cc0..d9257fdaa7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -40,8 +40,8 @@ class PartitionSnapshotStorage implements SnapshotStorage {
     /** Raft options. */
     final RaftOptions raftOptions;
 
-    /** Instance of partition storage. */
-    final MvPartitionStorage partitionStorage;
+    /** Instance of partition. */
+    final PartitionAccess partition;
 
     /** Snapshot meta, constructed from the storage data and reaft group 
configuration. */
     final SnapshotMeta snapshotMeta;
@@ -54,18 +54,18 @@ class PartitionSnapshotStorage implements SnapshotStorage {
      *
      * @param snapshotUri Snapshot URI.
      * @param raftOptions RAFT options.
-     * @param partitionStorage Partition storage.
+     * @param partition Partition.
      * @param snapshotMeta Snapshot meta.
      */
     public PartitionSnapshotStorage(
             String snapshotUri,
             RaftOptions raftOptions,
-            MvPartitionStorage partitionStorage,
+            PartitionAccess partition,
             SnapshotMeta snapshotMeta
     ) {
         this.snapshotUri = snapshotUri;
         this.raftOptions = raftOptions;
-        this.partitionStorage = partitionStorage;
+        this.partition = partition;
         this.snapshotMeta = snapshotMeta;
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 9e747eaae4..0fddc5f9d8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -39,7 +39,7 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
  */
 public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory 
{
     /** Partition storage. */
-    private final MvPartitionStorage partitionStorage;
+    private final PartitionAccess partition;
 
     /** List of peers. */
     private final List<String> peers;
@@ -47,25 +47,25 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
     /** List of learners. */
     private final List<String> learners;
 
-    /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()} 
during factory instantiation. */
+    /** RAFT log index read from {@link PartitionAccess#persistedIndex()} 
during factory instantiation. */
     private final long persistedRaftIndex;
 
     /**
      * Constructor.
      *
-     * @param partitionStorage MV partition storage.
+     * @param partition MV partition storage.
      * @param peers List of raft group peers to be used in snapshot meta.
      * @param learners List of raft group learners to be used in snapshot meta.
      *
      * @see SnapshotMeta
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-    public PartitionSnapshotStorageFactory(MvPartitionStorage 
partitionStorage, List<String> peers, List<String> learners) {
-        this.partitionStorage = partitionStorage;
+    public PartitionSnapshotStorageFactory(PartitionAccess partition, 
List<String> peers, List<String> learners) {
+        this.partition = partition;
         this.peers = peers;
         this.learners = learners;
 
-        persistedRaftIndex = partitionStorage.persistedIndex();
+        persistedRaftIndex = partition.persistedIndex();
     }
 
     /** {@inheritDoc} */
@@ -80,6 +80,6 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
                 .learnersList(learners)
                 .build();
 
-        return new PartitionSnapshotStorage(uri, raftOptions, 
partitionStorage, snapshotMeta);
+        return new PartitionSnapshotStorage(uri, raftOptions, partition, 
snapshotMeta);
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index ba3e26e9f6..3fbf2725d6 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.storage.state;
 
-import java.nio.file.Path;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.tx.TxMeta;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Storage for transaction meta, {@link TxMeta}.
@@ -52,6 +50,25 @@ public interface TxStateStorage extends AutoCloseable {
      */
     void stop() throws Exception;
 
+    /**
+     * Flushes current state of the data or <i>the state from the nearest 
future</i> to the storage. It means that the future can be
+     * completed when {@link #persistedIndex()} is higher than {@link 
#lastAppliedIndex()} at the moment of the method's call. This feature
+     * allows implementing a batch flush for several partitions at once.
+     *
+     * @return Future that's completed when flushing of the data is completed.
+     */
+    CompletableFuture<Void> flush();
+
+    /**
+     * Index of the highest write command applied to the storage. {@code 0} if 
index is unknown.
+     */
+    long lastAppliedIndex();
+
+    /**
+     * {@link #lastAppliedIndex()} value consistent with the data, already 
persisted on the storage.
+     */
+    long persistedIndex();
+
     /**
      * Get tx meta by tx id.
      *
@@ -78,11 +95,12 @@ public interface TxStateStorage extends AutoCloseable {
      * @param txId Tx id.
      * @param txStateExpected Tx state that is expected to be in the storage.
      * @param txMeta Tx meta.
+     * @param commandIndex New value for {@link #lastAppliedIndex()}.
      * @return Whether the CAS operation is successful.
      * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
      *                                 the operation has failed.
      */
-    boolean compareAndSet(UUID txId, @NotNull TxState txStateExpected, 
@NotNull TxMeta txMeta);
+    boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, 
long commandIndex);
 
     /**
      * Remove the tx meta from the storage.
@@ -107,19 +125,4 @@ public interface TxStateStorage extends AutoCloseable {
      *                                 the operation has failed.
      */
     void destroy();
-
-    /**
-     * Create a snapshot of the storage's current state in the specified 
directory.
-     *
-     * @param snapshotPath Snapshot path.
-     * @return Future which completes when the snapshot operation is complete.
-     */
-    CompletableFuture<Void> snapshot(Path snapshotPath);
-
-    /**
-     * Restore a storage from the snapshot.
-     *
-     * @param snapshotPath Snapshot path.
-     */
-    void restoreSnapshot(Path snapshotPath);
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 8139f82090..d31a142bce 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -18,29 +18,41 @@
 package org.apache.ignite.internal.tx.storage.state.rocksdb;
 
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_CREATE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_DESTROY_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
+import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.Options;
 import org.rocksdb.ReadOptions;
@@ -55,10 +67,20 @@ import org.rocksdb.WriteOptions;
 /**
  * Tx state storage implementation based on RocksDB.
  */
-public class TxStateRocksDbStorage implements TxStateStorage, AutoCloseable {
+public class TxStateRocksDbStorage implements TxStateStorage {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** Key for the applied index. */
+    private static final byte[] APPLIED_INDEX_KEY = 
ArrayUtils.BYTE_EMPTY_ARRAY;
+
     /** Database path. */
     private final Path dbPath;
 
+    /** RocksDB flusher instance. */
+    private volatile RocksDbFlusher flusher;
+
     /** RocksDB database. */
     private volatile TransactionDB db;
 
@@ -69,41 +91,86 @@ public class TxStateRocksDbStorage implements 
TxStateStorage, AutoCloseable {
     /** Database options. */
     private volatile TransactionDBOptions txDbOptions;
 
+    /** Write options. */
+    private final WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
+
+    /** Read options for regular reads. */
+    private final ReadOptions readOptions = new ReadOptions();
+
+    /** Read options for reading persisted data. */
+    private final ReadOptions persistedTierReadOptions = new 
ReadOptions().setReadTier(PERSISTED_TIER);
+
+    /** Scheduled pool for {@link RocksDbFlusher}. */
+    private final ScheduledExecutorService scheduledPool;
+
     /** Thread-pool for snapshot operations execution. */
-    private final ExecutorService snapshotExecutor;
+    private final ExecutorService threadPool;
+
+    /** Delay supplier for {@link RocksDbFlusher}. */
+    private final IntSupplier delaySupplier;
 
-    /** Snapshot manager. */
-    private volatile RocksSnapshotManager snapshotManager;
+    /** On-heap-cached last applied index value. */
+    private volatile long lastAppliedIndex;
 
-    /** Snapshot restore lock. */
-    private final Object snapshotRestoreLock = new Object();
+    /** The value of {@link #lastAppliedIndex} persisted to the device at this 
moment. */
+    private volatile long persistedIndex;
 
     /** Whether is started. */
     private boolean isStarted;
 
+    /** Collection of opened RocksDB iterators. */
+    private final Set<RocksIterator> iterators = ConcurrentHashMap.newKeySet();
+
+    /** Busy lock to stop synchronously. */
+    final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
      * @param dbPath Database path.
-     * @param snapshotExecutor Snapshot thread pool.
+     * @param scheduledPool Scheduled thread pool.
+     * @param threadPool Thread pool.
+     * @param delaySupplier Supplier of delay values to batch independent 
flush requests. Please refer to {@link RocksDbFlusher} for
+     *      details.
      */
-    public TxStateRocksDbStorage(Path dbPath, ExecutorService 
snapshotExecutor) {
+    public TxStateRocksDbStorage(
+            Path dbPath,
+            ScheduledExecutorService scheduledPool,
+            ExecutorService threadPool,
+            IntSupplier delaySupplier
+    ) {
         this.dbPath = dbPath;
-        this.snapshotExecutor = snapshotExecutor;
+        this.threadPool = threadPool;
+        this.scheduledPool = scheduledPool;
+        this.delaySupplier = delaySupplier;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
         try {
-            this.options = new Options().setCreateIfMissing(true);
+            flusher = new RocksDbFlusher(
+                    busyLock,
+                    scheduledPool,
+                    threadPool,
+                    delaySupplier,
+                    this::refreshPersistedIndex
+            );
+
+            options = new Options()
+                    .setCreateIfMissing(true)
+                    .setAtomicFlush(true)
+                    .setListeners(List.of(flusher.listener()));
 
-            this.txDbOptions = new TransactionDBOptions();
+            txDbOptions = new TransactionDBOptions();
 
-            this.db = TransactionDB.open(options, txDbOptions, 
dbPath.toString());
+            db = TransactionDB.open(options, txDbOptions, dbPath.toString());
 
-            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+            lastAppliedIndex = readLastAppliedIndex(readOptions);
 
-            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+            persistedIndex = lastAppliedIndex;
 
             isStarted = true;
         } catch (RocksDBException e) {
@@ -111,6 +178,36 @@ public class TxStateRocksDbStorage implements 
TxStateStorage, AutoCloseable {
         }
     }
 
+    private void refreshPersistedIndex() {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            persistedIndex = readLastAppliedIndex(persistedTierReadOptions);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Reads the value of {@link #lastAppliedIndex} from the storage.
+     *
+     * @param readOptions Read options to be used for reading.
+     * @return The value of last applied index.
+     */
+    private long readLastAppliedIndex(ReadOptions readOptions) {
+        byte[] appliedIndexBytes;
+
+        try {
+            appliedIndexBytes = db.get(readOptions, APPLIED_INDEX_KEY);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
read applied index value from transaction state storage", e);
+        }
+
+        return appliedIndexBytes == null ? 0 : bytesToLong(appliedIndexBytes);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isStarted() {
         return isStarted;
@@ -118,7 +215,22 @@ public class TxStateRocksDbStorage implements 
TxStateStorage, AutoCloseable {
 
     /** {@inheritDoc} */
     @Override public void stop() throws Exception {
-        IgniteUtils.closeAll(options, txDbOptions, db);
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        List<AutoCloseable> closeables = new ArrayList<>(iterators);
+
+        closeables.add(persistedTierReadOptions);
+        closeables.add(readOptions);
+        closeables.add(writeOptions);
+        closeables.add(options);
+        closeables.add(txDbOptions);
+        closeables.add(db);
+
+        IgniteUtils.closeAll(closeables);
 
         db = null;
         options = null;
@@ -127,81 +239,157 @@ public class TxStateRocksDbStorage implements 
TxStateStorage, AutoCloseable {
         isStarted = false;
     }
 
+    @Override
+    public CompletableFuture<Void> flush() {
+        return flusher.awaitFlush(true);
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    @Override
+    public long persistedIndex() {
+        return persistedIndex;
+    }
+
     /** {@inheritDoc} */
     @Override public TxMeta get(UUID txId) {
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
         try {
-            byte[] txMetaBytes = db.get(toBytes(txId));
+            byte[] txMetaBytes = db.get(uuidToBytes(txId));
 
             return txMetaBytes == null ? null : (TxMeta) 
fromBytes(txMetaBytes);
         } catch (RocksDBException e) {
             throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
get a value from the transaction state storage", e);
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
     /** {@inheritDoc} */
     @Override public void put(UUID txId, TxMeta txMeta) {
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
         try {
-            db.put(toBytes(txId), toBytes(txMeta));
+            db.put(uuidToBytes(txId), toBytes(txMeta));
         } catch (RocksDBException e) {
             throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
put a value into the transaction state storage", e);
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
     /** {@inheritDoc} */
-    @Override public boolean compareAndSet(UUID txId, @NotNull TxState 
txStateExpected, @NotNull TxMeta txMeta) {
+    @Override public boolean compareAndSet(UUID txId, TxState txStateExpected, 
TxMeta txMeta, long commandIndex) {
         requireNonNull(txStateExpected);
         requireNonNull(txMeta);
 
-        byte[] txIdBytes = toBytes(txId);
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
+        byte[] txIdBytes = uuidToBytes(txId);
 
-        try (Transaction rocksTx = db.beginTransaction(new WriteOptions())) {
-            byte[] txMetaExistingBytes = rocksTx.get(new ReadOptions(), 
toBytes(txId));
-            TxMeta txMetaExisting = (TxMeta) fromBytes(txMetaExistingBytes);
+        try (Transaction rocksTx = db.beginTransaction(writeOptions)) {
+            byte[] txMetaExistingBytes = rocksTx.get(readOptions, 
uuidToBytes(txId));
+            TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
+
+            boolean result;
 
             if (txMetaExisting.txState() == txStateExpected) {
                 rocksTx.put(txIdBytes, toBytes(txMeta));
 
-                rocksTx.commit();
-
-                return true;
+                result = true;
             } else {
-                rocksTx.rollback();
-
-                return false;
+                result = false;
             }
+
+            rocksTx.put(APPLIED_INDEX_KEY, longToBytes(commandIndex));
+
+            rocksTx.commit();
+
+            return result;
         } catch (RocksDBException e) {
             throw new IgniteInternalException(
                 TX_STATE_STORAGE_ERR,
                 "Failed perform CAS operation over a value in transaction 
state storage",
                 e
             );
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
     /** {@inheritDoc} */
     @Override public void remove(UUID txId) {
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
         try {
-            db.delete(toBytes(txId));
+            db.delete(uuidToBytes(txId));
         } catch (RocksDBException e) {
             throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
remove a value from the transaction state storage", e);
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
     /** {@inheritDoc} */
     @Override public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
-        RocksIterator rocksIterator = db.newIterator();
-        rocksIterator.seekToFirst();
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
+        try {
+            RocksIterator rocksIterator = db.newIterator();
 
-        RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter = 
new RocksIteratorAdapter<>(rocksIterator) {
-            @Override protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[] 
keyBytes, byte[] valueBytes) {
-                UUID key = (UUID) fromBytes(keyBytes);
-                TxMeta txMeta = (TxMeta) fromBytes(valueBytes);
+            iterators.add(rocksIterator);
 
-                return new IgniteBiTuple<>(key, txMeta);
+            try {
+                // Skip applied index value.
+                rocksIterator.seek(new byte[1]);
+            } catch (Exception e) {
+                // Unlikely, but what if...
+                iterators.remove(rocksIterator);
+
+                rocksIterator.close();
+
+                throw e;
             }
-        };
 
-        return Cursor.fromIterator(iteratorAdapter);
+            RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter 
= new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+                @Override protected IgniteBiTuple<UUID, TxMeta> 
decodeEntry(byte[] keyBytes, byte[] valueBytes) {
+                    UUID key = bytesToUuid(keyBytes);
+                    TxMeta txMeta = fromBytes(valueBytes);
+
+                    return new IgniteBiTuple<>(key, txMeta);
+                }
+
+                @Override
+                protected void handleBusy() {
+                    throwStorageStoppedException();
+                }
+
+                @Override
+                public void close() throws Exception {
+                    iterators.remove(rocksIterator);
+
+                    super.close();
+                }
+            };
+
+            return Cursor.fromIterator(iteratorAdapter);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -215,20 +403,22 @@ public class TxStateRocksDbStorage implements 
TxStateStorage, AutoCloseable {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        return snapshotManager.createSnapshot(snapshotPath);
+    private static void throwStorageStoppedException() {
+        throw new IgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, 
"Transaction state storage is stopped");
     }
 
-    /** {@inheritDoc} */
-    @Override public void restoreSnapshot(Path snapshotPath) {
-        synchronized (snapshotRestoreLock) {
-            destroy();
+    private byte[] uuidToBytes(UUID uuid) {
+        return ByteBuffer.allocate(2 * Long.BYTES).order(ByteOrder.BIG_ENDIAN)
+                .putLong(uuid.getMostSignificantBits())
+                .putLong(uuid.getLeastSignificantBits())
+                .array();
+    }
 
-            start();
+    private UUID bytesToUuid(byte[] bytes) {
+        long msb = bytesToLong(bytes, 0);
+        long lsb = bytesToLong(bytes, Long.BYTES);
 
-            snapshotManager.restoreSnapshot(snapshotPath);
-        }
+        return new UUID(msb, lsb);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
index 5506960e6f..a3dc9c10cc 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.tx.storage.state;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static java.util.stream.Collectors.toList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -24,16 +26,12 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.io.File;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executors;
 import java.util.stream.IntStream;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -124,8 +122,8 @@ public class TxStateStorageTest {
 
             assertTxMetaEquals(storage.get(txId), txMeta0);
 
-            assertFalse(storage.compareAndSet(txId, txMeta1.txState(), 
txMeta2));
-            assertTrue(storage.compareAndSet(txId, txMeta0.txState(), 
txMeta2));
+            assertFalse(storage.compareAndSet(txId, txMeta1.txState(), 
txMeta2, 1));
+            assertTrue(storage.compareAndSet(txId, txMeta0.txState(), txMeta2, 
2));
 
             assertTxMetaEquals(storage.get(txId), txMeta2);
         }
@@ -143,6 +141,7 @@ public class TxStateStorageTest {
                 TxMeta txMeta = new TxMeta(TxState.PENDING, 
generateEnlistedPartitions(i), generateTimestamp(txId));
                 txs.put(txId, txMeta);
                 storage.put(txId, txMeta);
+                storage.compareAndSet(txId, TxState.PENDING, txMeta, i);
             }
 
             try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor = 
storage.scan()) {
@@ -163,65 +162,6 @@ public class TxStateStorageTest {
         }
     }
 
-    @Test
-    public void testSnapshot() throws Exception {
-        try (TxStateStorage storage = createStorage()) {
-            storage.start();
-
-            List<UUID> inSnapshot = new ArrayList<>();
-
-            for (int i = 0; i < 100; i++) {
-                UUID txId = UUID.randomUUID();
-
-                storage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)));
-
-                inSnapshot.add(txId);
-            }
-
-            Path snapshotDirPath = workDir.resolve("snapshot");
-            Files.createDirectories(snapshotDirPath);
-
-            try {
-                storage.snapshot(snapshotDirPath).join();
-
-                List<UUID> notInSnapshot = new ArrayList<>();
-
-                for (int i = 0; i < 100; i++) {
-                    UUID txId = UUID.randomUUID();
-
-                    storage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)));
-
-                    notInSnapshot.add(txId);
-                }
-
-                for (int i = 0; i < 100; i++) {
-                    UUID txId = notInSnapshot.get(i);
-
-                    assertTxMetaEquals(new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
-                }
-
-                storage.restoreSnapshot(snapshotDirPath);
-
-                for (int i = 0; i < 100; i++) {
-                    UUID txId = inSnapshot.get(i);
-
-                    assertTxMetaEquals(new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
-                }
-
-                for (int i = 0; i < 100; i++) {
-                    UUID txId = notInSnapshot.get(i);
-
-                    assertNull(storage.get(txId));
-                }
-            } finally {
-                Files.walk(snapshotDirPath)
-                        .sorted(Comparator.reverseOrder())
-                        .map(Path::toFile)
-                        .forEach(File::delete);
-            }
-        }
-    }
-
     private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
         assertEquals(txMeta0.txState(), txMeta1.txState());
         assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
@@ -229,6 +169,6 @@ public class TxStateStorageTest {
     }
 
     private TxStateStorage createStorage() {
-        return new TxStateRocksDbStorage(workDir, 
Executors.newSingleThreadExecutor());
+        return new TxStateRocksDbStorage(workDir, 
newSingleThreadScheduledExecutor(), newSingleThreadExecutor(), () -> 100);
     }
 }

Reply via email to