This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e5156a8caa IGNITE-17611 Local storage recovery code for
TxStateRocksDbStorage (#1061)
e5156a8caa is described below
commit e5156a8caa7b680ffb94d3d32f0765410a9d02e2
Author: ibessonov <[email protected]>
AuthorDate: Fri Sep 9 14:58:54 2022 +0300
IGNITE-17611 Local storage recovery code for TxStateRocksDbStorage (#1061)
---
.../org/apache/ignite/internal/util/ByteUtils.java | 10 +-
.../java/org/apache/ignite/lang/ErrorGroups.java | 9 +-
.../internal/rocksdb/BusyRocksIteratorAdapter.java | 91 +++++++
.../internal/rocksdb/RocksIteratorAdapter.java | 2 +-
.../rocksdb/flush}/RocksDbFlushListener.java | 63 ++---
.../internal/rocksdb/flush/RocksDbFlusher.java | 238 +++++++++++++++++
.../storage/rocksdb/RocksDbTableStorage.java | 150 +++--------
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 3 +-
.../distributed/raft/snapshot/PartitionAccess.java | 28 ++
.../raft/snapshot/PartitionSnapshotStorage.java | 10 +-
.../snapshot/PartitionSnapshotStorageFactory.java | 14 +-
.../internal/tx/storage/state/TxStateStorage.java | 39 +--
.../state/rocksdb/TxStateRocksDbStorage.java | 294 +++++++++++++++++----
.../tx/storage/state/TxStateStorageTest.java | 72 +----
15 files changed, 708 insertions(+), 317 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 180aca3a5d..8d921a2b04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -38,18 +38,12 @@ public class ByteUtils {
public static long bytesToLong(byte[] bytes, int off) {
assert bytes != null;
- int bytesCnt = Long.BYTES;
-
- if (off + bytesCnt > bytes.length) {
- bytesCnt = bytes.length - off;
- }
+ int bytesCnt = Math.min(Long.BYTES, bytes.length - off);
long res = 0;
for (int i = 0; i < bytesCnt; i++) {
- int shift = bytesCnt - i - 1 << 3;
-
- res |= (0xffL & bytes[off++]) << shift;
+ res = (res << 8) | (0xffL & bytes[off + i]);
}
return res;
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 08d71125cc..068e0e94b6 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -176,12 +176,15 @@ public class ErrorGroups {
public static final ErrorGroup TX_ERR_GROUP =
ErrorGroup.newGroup("TX", 3);
/** Error on creation of tx state storage. */
- public static int TX_STATE_STORAGE_CREATE_ERR =
TX_ERR_GROUP.registerErrorCode(1);
+ public static final int TX_STATE_STORAGE_CREATE_ERR =
TX_ERR_GROUP.registerErrorCode(1);
/** Error on destruction of tx state storage. */
- public static int TX_STATE_STORAGE_DESTROY_ERR =
TX_ERR_GROUP.registerErrorCode(2);
+ public static final int TX_STATE_STORAGE_DESTROY_ERR =
TX_ERR_GROUP.registerErrorCode(2);
/** Error of tx state storage. */
- public static int TX_STATE_STORAGE_ERR =
TX_ERR_GROUP.registerErrorCode(3);
+ public static final int TX_STATE_STORAGE_ERR =
TX_ERR_GROUP.registerErrorCode(3);
+
+ /** Tx state storage is stopped. */
+ public static final int TX_STATE_STORAGE_STOPPED_ERR =
TX_ERR_GROUP.registerErrorCode(4);
}
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
new file mode 100644
index 0000000000..c74655ba8b
--- /dev/null
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Adapter from a {@link RocksIterator} to a {@link Cursor} that also handles
stopping of the storage.
+ */
+public abstract class BusyRocksIteratorAdapter<T> extends
RocksIteratorAdapter<T> {
+ /** Busy lock. */
+ private final IgniteSpinBusyLock busyLock;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock Busy lock.
+ * @param it RocksDB iterator.
+ */
+ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock,
RocksIterator it) {
+ super(it);
+ this.busyLock = busyLock;
+ }
+
+ /**
+ * Handles busy lock acquiring failure. This means that db has been
stopped and cursor can't proceed. Must throw an exception.
+ */
+ protected abstract void handleBusy();
+
+ private void handleBusy0() {
+ handleBusy();
+
+ assert false : "handleBusy() should have thrown an exception.";
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!busyLock.enterBusy()) {
+ handleBusy0();
+ }
+
+ try {
+ return super.hasNext();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!busyLock.enterBusy()) {
+ handleBusy0();
+ }
+
+ try {
+ return super.next();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!busyLock.enterBusy()) {
+ handleBusy0();
+ }
+
+ try {
+ super.close();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
index 5d50e29ec2..ea902ecebb 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksIteratorAdapter.java
@@ -52,7 +52,7 @@ public abstract class RocksIteratorAdapter<T> implements
Cursor<T> {
if (!isValid) {
// check the status first. This operation is guaranteed to throw
if an internal error has occurred during
- // the iteration. Otherwise we've exhausted the data range.
+ // the iteration. Otherwise, we've exhausted the data range.
RocksUtils.checkIterator(it);
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
similarity index 53%
rename from
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
rename to
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
index 69d13cad81..1fd6ada7a7 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.rocksdb;
+package org.apache.ignite.internal.rocksdb.flush;
import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BEGIN;
import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
@@ -23,26 +23,16 @@ import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_CO
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.configuration.schemas.table.TableView;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.storage.StorageException;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.FlushJobInfo;
import org.rocksdb.RocksDB;
/**
- * Represents a listener of RocksDB flush events. Responsible for updating
persisted index of partitions.
- *
- * @see RocksDbMvPartitionStorage#persistedIndex()
- * @see RocksDbMvPartitionStorage#refreshPersistedIndex()
+ * Represents a listener of RocksDB flush events.
*/
class RocksDbFlushListener extends AbstractEventListener {
- /** Logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RocksDbFlushListener.class);
-
- /** Table storage. */
- private final RocksDbTableStorage tableStorage;
+ /** Flusher instance. */
+ private final RocksDbFlusher flusher;
/**
* Type of last processed event. Real amount of events doesn't matter in
atomic flush mode. All "completed" events go after all "begin"
@@ -55,9 +45,15 @@ class RocksDbFlushListener extends AbstractEventListener {
*/
private volatile CompletableFuture<?> lastFlushProcessed =
CompletableFuture.completedFuture(null);
- public RocksDbFlushListener(RocksDbTableStorage tableStorage) {
+ /**
+ * Constructor.
+ *
+ * @param flusher Flusher instance to delegate events processing to.
+ */
+ RocksDbFlushListener(RocksDbFlusher flusher) {
super(EnabledEventCallback.ON_FLUSH_BEGIN,
EnabledEventCallback.ON_FLUSH_COMPLETED);
- this.tableStorage = tableStorage;
+
+ this.flusher = flusher;
}
/** {@inheritDoc} */
@@ -71,42 +67,13 @@ class RocksDbFlushListener extends AbstractEventListener {
/** {@inheritDoc} */
@Override
public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
- ExecutorService threadPool = tableStorage.engine().threadPool();
+ ExecutorService threadPool = flusher.threadPool;
if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
- lastFlushProcessed =
CompletableFuture.runAsync(this::refreshPersistedIndexes, threadPool);
+ lastFlushProcessed =
CompletableFuture.runAsync(flusher.onFlushCompleted, threadPool);
}
// Do it for every column family, there's no way to tell in advance
which one has the latest sequence number.
- lastFlushProcessed.whenCompleteAsync((o, throwable) ->
tableStorage.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
- }
-
- private void refreshPersistedIndexes() {
- if (!tableStorage.busyLock.enterBusy()) {
- return;
- }
-
- try {
- TableView tableCfgView = tableStorage.configuration().value();
-
- for (int partitionId = 0; partitionId < tableCfgView.partitions();
partitionId++) {
- RocksDbMvPartitionStorage partition =
tableStorage.getMvPartition(partitionId);
-
- if (partition != null) {
- try {
- partition.refreshPersistedIndex();
- } catch (StorageException storageException) {
- LOG.error(
- "Filed to refresh persisted applied index
value for table {} partition {}",
- storageException,
- tableStorage.configuration().name().value(),
- partitionId
- );
- }
- }
- }
- } finally {
- tableStorage.busyLock.leaveBusy();
- }
+ lastFlushProcessed.whenCompleteAsync((o, throwable) ->
flusher.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
}
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
new file mode 100644
index 0000000000..31a381568c
--- /dev/null
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.flush;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.rocksdb.AbstractEventListener;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Helper class to deal with RocksDB flushes. Provides an ability to wait
until current state of data is flushed to the storage.
+ * Requires enabled {@link Options#setAtomicFlush(boolean)} option to work
properly.
+ */
+public class RocksDbFlusher {
+ /** Logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(RocksDbFlusher.class);
+
+ /** Rocks DB instance. */
+ private volatile RocksDB db;
+
+ /** List of all column families. */
+ private volatile List<ColumnFamilyHandle> columnFamilyHandles;
+
+ /** Scheduled pool to schedule flushes. */
+ private final ScheduledExecutorService scheduledPool;
+
+ /** Thread pool to complete flush completion futures. */
+ final ExecutorService threadPool;
+
+ /** Supplier of delay values to batch independent flush requests. */
+ private final IntSupplier delaySupplier;
+
+ /** Flush completion callback. */
+ final Runnable onFlushCompleted;
+
+ /**
+ * Flush options to be used to asynchronously flush the Rocks DB memtable.
It needs to be cached, because
+ * {@link RocksDB#flush(FlushOptions)} contract requires this object to
not be GC-ed.
+ */
+ private final FlushOptions flushOptions = new
FlushOptions().setWaitForFlush(false);
+
+ /** Map with flush futures by sequence number at the time of the {@link
#awaitFlush(boolean)} call. */
+ final SortedMap<Long, CompletableFuture<Void>>
flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
+
+ /** Latest known sequence number for persisted data. Not volatile,
protected by explicit synchronization. */
+ private long latestPersistedSequenceNumber;
+
+ /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
+ private final Object latestPersistedSequenceNumberMux = new Object();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock;
+
+ /**
+ * Instance of the latest scheduled flush closure.
+ *
+ * @see #scheduleFlush()
+ */
+ private volatile Runnable latestFlushClosure;
+
+ /**
+ * Constructor.
+ *
+ * @param busyLock Busy lock.
+ * @param scheduledPool Scheduled pool the schedule flushes.
+ * @param threadPool Thread pool to run flush completion closure, provided
by {@code onFlushCompleted} parameter.
+ * @param delaySupplier Supplier of delay values to batch independent
flush requests. When {@link #awaitFlush(boolean)} is called with
+ * {@code true} parameter, the flusher waits given number of
milliseconds (using {@code scheduledPool}) and then executes flush
+ * only if there were no other {@code awaitFlush(true)} calls.
Otherwise, it does nothing after the timeout. This guarantees that
+ * either the last one wins, or automatic flush wins if there's an
enlless stream of {@code awaitFlush(true)} calls with very small
+ * time-intervals between them. Such behavior allows to save on
unnecessary flushes when multiple await flush calls appear at
+ * roughly the same time from different threads. For example, several
partitions might be flushed at the same time, because they
+ * started at the same time and their flush frequency is also the
same.
+ * @param onFlushCompleted Flush completion callback. Executed on every
individual column family flush.
+ */
+ public RocksDbFlusher(
+ IgniteSpinBusyLock busyLock,
+ ScheduledExecutorService scheduledPool,
+ ExecutorService threadPool,
+ IntSupplier delaySupplier,
+ Runnable onFlushCompleted
+ ) {
+ this.busyLock = busyLock;
+ this.scheduledPool = scheduledPool;
+ this.threadPool = threadPool;
+ this.delaySupplier = delaySupplier;
+ this.onFlushCompleted = onFlushCompleted;
+ }
+
+ /**
+ * Returns an instance of {@link AbstractEventListener} to process actual
RocksDB events. Returned listener must be set into
+ * {@link Options#setListeners(List)} before database is started.
Otherwise, no events would occurre.
+ */
+ public AbstractEventListener listener() {
+ return new RocksDbFlushListener(this);
+ }
+
+ /**
+ * Initializes the flusher with DB instance and a list of column families.
+ *
+ * @param db Rocks DB instance.
+ * @param columnFamilyHandles List of all column families. Column families
missing from this list may not have flush events processed.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public void init(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles)
{
+ this.db = db;
+ this.columnFamilyHandles = columnFamilyHandles;
+
+ synchronized (latestPersistedSequenceNumberMux) {
+ latestPersistedSequenceNumber = db.getLatestSequenceNumber();
+ }
+ }
+
+ /**
+ * Returns a future to wait next flush operation from the current point in
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
+ * achieve this, by fixing its value at the time of invokation. Storage is
considered flushed when at least one persisted column
+ * family has its latest sequence number greater or equal to the one that
we fixed. This is enough to guarantee that all column families
+ * have up-to-data state as well, because flusher expects its users to
also have {@link Options#setAtomicFlush(boolean)} option
+ * enabled.
+ *
+ * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)}
should be explicitly triggerred in the near future. Please refer
+ * to {@link RocksDbFlusher#RocksDbFlusher(IgniteSpinBusyLock,
ScheduledExecutorService, ExecutorService, IntSupplier, Runnable)}
+ * parameters description to see what's really happening in this case.
+ *
+ * @see #scheduleFlush()
+ */
+ public CompletableFuture<Void> awaitFlush(boolean schedule) {
+ CompletableFuture<Void> future;
+
+ long dbSequenceNumber = db.getLatestSequenceNumber();
+
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (dbSequenceNumber <= latestPersistedSequenceNumber) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ future =
flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, s -> new
CompletableFuture<>());
+ }
+
+ if (schedule) {
+ scheduleFlush();
+ }
+
+ return future;
+ }
+
+ /**
+ * Schedules a flush of the table. If run several times within a small
amount of time, only the last scheduled flush will be executed.
+ */
+ private void scheduleFlush() {
+ Runnable newClosure = new Runnable() {
+ @Override
+ public void run() {
+ if (latestFlushClosure != this) {
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ // Explicit list of CF handles is mandatory!
+ // Default flush is buggy and only invokes listener
methods for a single random CF.
+ db.flush(flushOptions, columnFamilyHandles);
+ } catch (RocksDBException e) {
+ LOG.error("Error occurred during the explicit flush", e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+ };
+
+ latestFlushClosure = newClosure;
+
+ scheduledPool.schedule(newClosure, delaySupplier.getAsInt(),
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a
given sequence number.
+ */
+ void completeFutures(long sequenceNumber) {
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (sequenceNumber <= latestPersistedSequenceNumber) {
+ return;
+ }
+
+ latestPersistedSequenceNumber = sequenceNumber;
+ }
+
+ SortedMap<Long, CompletableFuture<Void>> futuresToComplete =
flushFuturesBySequenceNumber.headMap(sequenceNumber + 1);
+
+ for (CompletableFuture<Void> future : futuresToComplete.values()) {
+ future.complete(null);
+ }
+
+ futuresToComplete.clear();
+ }
+
+ /**
+ * Stops the flusher by cancelling all of its futures.
+ */
+ public void stop() {
+ for (CompletableFuture<Void> future :
flushFuturesBySequenceNumber.values()) {
+ future.cancel(false);
+ }
+
+ flushOptions.close();
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 5b3d98d323..47f1f7bf23 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -27,22 +27,20 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -84,18 +82,15 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Data region for the table. */
private final RocksDbDataRegion dataRegion;
+ /** RocksDB flusher instance. */
+ private volatile RocksDbFlusher flusher;
+
/** Rocks DB instance. */
private volatile RocksDB db;
/** Write options for write operations. */
private final WriteOptions writeOptions = new
WriteOptions().setDisableWAL(true);
- /**
- * Flush options to be used to asynchronously flush the Rocks DB memtable.
It needs to be cached, because
- * {@link RocksDB#flush(FlushOptions)} contract requires this object to
not be GC-ed.
- */
- private final FlushOptions flushOptions = new
FlushOptions().setWaitForFlush(false);
-
/** Meta information. */
private volatile RocksDbMetaStorage meta;
@@ -105,31 +100,12 @@ public class RocksDbTableStorage implements
MvTableStorage {
/** Column Family handle for Hash Index data. */
private volatile ColumnFamily hashIndexCf;
- /** List of all existing Column Family handles. */
- private volatile List<ColumnFamilyHandle> allCfHandles;
-
/** Partition storages. */
private volatile AtomicReferenceArray<RocksDbMvPartitionStorage>
partitions;
/** Hash Index storages by Index IDs. */
private final ConcurrentMap<UUID, HashIndices> hashIndices = new
ConcurrentHashMap<>();
- /** Map with flush futures by sequence number at the time of the {@link
#awaitFlush(boolean)} call. */
- private final ConcurrentMap<Long, CompletableFuture<Void>>
flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
-
- /** Latest known sequence number for persisted data. Not volatile,
protected by explicit synchronization. */
- private long latestPersistedSequenceNumber;
-
- /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
- private final Object latestPersistedSequenceNumberMux = new Object();
-
- /**
- * Instance of the latest scheduled flush closure.
- *
- * @see #scheduleFlush()
- */
- private volatile Runnable latestFlushClosure;
-
/** Busy lock to stop synchronously. */
final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -193,6 +169,14 @@ public class RocksDbTableStorage implements MvTableStorage
{
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
+ flusher = new RocksDbFlusher(
+ busyLock,
+ engine.scheduledPool(),
+ engine().threadPool(),
+ engine.configuration().flushDelayMillis()::value,
+ this::refreshPersistedIndexes
+ );
+
try {
Files.createDirectories(tablePath);
} catch (IOException e) {
@@ -208,7 +192,7 @@ public class RocksDbTableStorage implements MvTableStorage {
.setCreateMissingColumnFamilies(true)
// Atomic flush must be enabled to guarantee consistency
between different column families when WAL is disabled.
.setAtomicFlush(true)
- .setListeners(List.of(new RocksDbFlushListener(this)))
+ .setListeners(List.of(flusher.listener()))
.setWriteBufferManager(dataRegion.writeBufferManager());
try {
@@ -243,12 +227,7 @@ public class RocksDbTableStorage implements MvTableStorage
{
assert partitionCf != null;
assert hashIndexCf != null;
- allCfHandles = List.copyOf(cfHandles);
-
- // Pointless synchronization, but without it there would be a
warning in the code.
- synchronized (latestPersistedSequenceNumberMux) {
- latestPersistedSequenceNumber = db.getLatestSequenceNumber();
- }
+ flusher.init(db, cfHandles);
} catch (RocksDBException e) {
throw new StorageException("Failed to initialize RocksDB
instance", e);
}
@@ -265,83 +244,40 @@ public class RocksDbTableStorage implements
MvTableStorage {
* achieve this.
*
* @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)}
should be explicitly triggerred in the near future.
- *
- * @see #scheduleFlush()
*/
public CompletableFuture<Void> awaitFlush(boolean schedule) {
- CompletableFuture<Void> future;
-
- long dbSequenceNumber = db.getLatestSequenceNumber();
-
- synchronized (latestPersistedSequenceNumberMux) {
- if (dbSequenceNumber <= latestPersistedSequenceNumber) {
- return CompletableFuture.completedFuture(null);
- }
-
- future =
flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, l -> new
CompletableFuture<>());
- }
-
- if (schedule) {
- scheduleFlush();
- }
-
- return future;
+ return flusher.awaitFlush(schedule);
}
- /**
- * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a
given sequence number.
- */
- void completeFutures(long sequenceNumber) {
- synchronized (latestPersistedSequenceNumberMux) {
- if (sequenceNumber <= latestPersistedSequenceNumber) {
- return;
- }
-
- latestPersistedSequenceNumber = sequenceNumber;
+ private void refreshPersistedIndexes() {
+ if (!busyLock.enterBusy()) {
+ return;
}
- Set<Entry<Long, CompletableFuture<Void>>> entries =
flushFuturesBySequenceNumber.entrySet();
-
- for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator =
entries.iterator(); iterator.hasNext(); ) {
- Entry<Long, CompletableFuture<Void>> entry = iterator.next();
-
- if (sequenceNumber >= entry.getKey()) {
- entry.getValue().complete(null);
-
- iterator.remove();
- }
- }
- }
+ try {
+ TableView tableCfgView = configuration().value();
- /**
- * Schedules a flush of the table. If run several times within a small
amount of time, only the last scheduled flush will be executed.
- */
- void scheduleFlush() {
- Runnable newClosure = new Runnable() {
- @Override
- public void run() {
- if (latestFlushClosure != this) {
- return;
- }
+ for (int partitionId = 0; partitionId < tableCfgView.partitions();
partitionId++) {
+ RocksDbMvPartitionStorage partition =
getMvPartition(partitionId);
- try {
- // Explicit list of CF handles is mandatory!
- // Default flush is buggy and only invokes listener
methods for a single random CF.
- db.flush(flushOptions, allCfHandles);
- } catch (RocksDBException e) {
- LOG.error("Error occurred during the explicit flush for
table '{}'", e, tableCfg.name());
+ if (partition != null) {
+ try {
+ partition.refreshPersistedIndex();
+ } catch (StorageException storageException) {
+ LOG.error(
+ "Filed to refresh persisted applied index
value for table {} partition {}",
+ storageException,
+ configuration().name().value(),
+ partitionId
+ );
+ }
}
}
- };
-
- latestFlushClosure = newClosure;
-
- int delay = engine.configuration().flushDelayMillis().value();
-
- engine.scheduledPool().schedule(newClosure, delay,
TimeUnit.MILLISECONDS);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
-
/** {@inheritDoc} */
@Override
public void stop() throws StorageException {
@@ -351,18 +287,18 @@ public class RocksDbTableStorage implements
MvTableStorage {
busyLock.block();
- for (CompletableFuture<Void> future :
flushFuturesBySequenceNumber.values()) {
- future.cancel(false);
- }
-
List<AutoCloseable> resources = new ArrayList<>();
+ resources.add(flusher::stop);
+
+ resources.add(meta.columnFamily().handle());
+ resources.add(partitionCf.handle());
+ resources.add(hashIndexCf.handle());
+
resources.add(db);
resources.add(writeOptions);
- resources.add(flushOptions);
-
for (int i = 0; i < partitions.length(); i++) {
MvPartitionStorage partition = partitions.get(i);
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index e51591470b..06c64b71a9 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -116,7 +116,7 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
CompletableFuture<Void> destroyFuture =
tableStorage.destroyPartition(42);
// Partition destruction doesn't enforce flush.
- ((RocksDbTableStorage) tableStorage).scheduleFlush();
+ ((RocksDbTableStorage) tableStorage).awaitFlush(true);
assertThat(destroyFuture, willCompleteSuccessfully());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a6a2722686..c488796387 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -751,7 +751,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
//TODO Revisit peers String representation:
https://issues.apache.org/jira/browse/IGNITE-17420
raftGroupOptions.snapshotStorageFactory(new
PartitionSnapshotStorageFactory(
- partitionStorage,
+ //TODO IGNITE-17302 Use miniumum from mv storage and tx state
storage.
+ partitionStorage::persistedIndex,
peers.stream().map(n -> new
Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
List.of()
));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
new file mode 100644
index 0000000000..48b5074d07
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+/**
+ * Small abstractions for partition storages that includes only methods,
mandatory for the snapshot storage.
+ */
+public interface PartitionAccess {
+ /**
+ * Returns persisted RAFT index for the partition.
+ */
+ long persistedIndex();
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 61dcfe0cc0..d9257fdaa7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -40,8 +40,8 @@ class PartitionSnapshotStorage implements SnapshotStorage {
/** Raft options. */
final RaftOptions raftOptions;
- /** Instance of partition storage. */
- final MvPartitionStorage partitionStorage;
+ /** Instance of partition. */
+ final PartitionAccess partition;
/** Snapshot meta, constructed from the storage data and reaft group
configuration. */
final SnapshotMeta snapshotMeta;
@@ -54,18 +54,18 @@ class PartitionSnapshotStorage implements SnapshotStorage {
*
* @param snapshotUri Snapshot URI.
* @param raftOptions RAFT options.
- * @param partitionStorage Partition storage.
+ * @param partition Partition.
* @param snapshotMeta Snapshot meta.
*/
public PartitionSnapshotStorage(
String snapshotUri,
RaftOptions raftOptions,
- MvPartitionStorage partitionStorage,
+ PartitionAccess partition,
SnapshotMeta snapshotMeta
) {
this.snapshotUri = snapshotUri;
this.raftOptions = raftOptions;
- this.partitionStorage = partitionStorage;
+ this.partition = partition;
this.snapshotMeta = snapshotMeta;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 9e747eaae4..0fddc5f9d8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -39,7 +39,7 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
*/
public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory
{
/** Partition storage. */
- private final MvPartitionStorage partitionStorage;
+ private final PartitionAccess partition;
/** List of peers. */
private final List<String> peers;
@@ -47,25 +47,25 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
/** List of learners. */
private final List<String> learners;
- /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()}
during factory instantiation. */
+ /** RAFT log index read from {@link PartitionAccess#persistedIndex()}
during factory instantiation. */
private final long persistedRaftIndex;
/**
* Constructor.
*
- * @param partitionStorage MV partition storage.
+ * @param partition MV partition storage.
* @param peers List of raft group peers to be used in snapshot meta.
* @param learners List of raft group learners to be used in snapshot meta.
*
* @see SnapshotMeta
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public PartitionSnapshotStorageFactory(MvPartitionStorage
partitionStorage, List<String> peers, List<String> learners) {
- this.partitionStorage = partitionStorage;
+ public PartitionSnapshotStorageFactory(PartitionAccess partition,
List<String> peers, List<String> learners) {
+ this.partition = partition;
this.peers = peers;
this.learners = learners;
- persistedRaftIndex = partitionStorage.persistedIndex();
+ persistedRaftIndex = partition.persistedIndex();
}
/** {@inheritDoc} */
@@ -80,6 +80,6 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
.learnersList(learners)
.build();
- return new PartitionSnapshotStorage(uri, raftOptions,
partitionStorage, snapshotMeta);
+ return new PartitionSnapshotStorage(uri, raftOptions, partition,
snapshotMeta);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index ba3e26e9f6..3fbf2725d6 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.storage.state;
-import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.tx.TxMeta;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.NotNull;
/**
* Storage for transaction meta, {@link TxMeta}.
@@ -52,6 +50,25 @@ public interface TxStateStorage extends AutoCloseable {
*/
void stop() throws Exception;
+ /**
+ * Flushes current state of the data or <i>the state from the nearest
future</i> to the storage. It means that the future can be
+ * completed when {@link #persistedIndex()} is higher than {@link
#lastAppliedIndex()} at the moment of the method's call. This feature
+ * allows implementing a batch flush for several partitions at once.
+ *
+ * @return Future that's completed when flushing of the data is completed.
+ */
+ CompletableFuture<Void> flush();
+
+ /**
+ * Index of the highest write command applied to the storage. {@code 0} if
index is unknown.
+ */
+ long lastAppliedIndex();
+
+ /**
+ * {@link #lastAppliedIndex()} value consistent with the data, already
persisted on the storage.
+ */
+ long persistedIndex();
+
/**
* Get tx meta by tx id.
*
@@ -78,11 +95,12 @@ public interface TxStateStorage extends AutoCloseable {
* @param txId Tx id.
* @param txStateExpected Tx state that is expected to be in the storage.
* @param txMeta Tx meta.
+ * @param commandIndex New value for {@link #lastAppliedIndex()}.
* @return Whether the CAS operation is successful.
* @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_ERR} error code in case when
* the operation has failed.
*/
- boolean compareAndSet(UUID txId, @NotNull TxState txStateExpected,
@NotNull TxMeta txMeta);
+ boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta,
long commandIndex);
/**
* Remove the tx meta from the storage.
@@ -107,19 +125,4 @@ public interface TxStateStorage extends AutoCloseable {
* the operation has failed.
*/
void destroy();
-
- /**
- * Create a snapshot of the storage's current state in the specified
directory.
- *
- * @param snapshotPath Snapshot path.
- * @return Future which completes when the snapshot operation is complete.
- */
- CompletableFuture<Void> snapshot(Path snapshotPath);
-
- /**
- * Restore a storage from the snapshot.
- *
- * @param snapshotPath Snapshot path.
- */
- void restoreSnapshot(Path snapshotPath);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 8139f82090..d31a142bce 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -18,29 +18,41 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.util.Objects.requireNonNull;
-import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_CREATE_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_DESTROY_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
+import static org.rocksdb.ReadTier.PERSISTED_TIER;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
@@ -55,10 +67,20 @@ import org.rocksdb.WriteOptions;
/**
* Tx state storage implementation based on RocksDB.
*/
-public class TxStateRocksDbStorage implements TxStateStorage, AutoCloseable {
+public class TxStateRocksDbStorage implements TxStateStorage {
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** Key for the applied index. */
+ private static final byte[] APPLIED_INDEX_KEY =
ArrayUtils.BYTE_EMPTY_ARRAY;
+
/** Database path. */
private final Path dbPath;
+ /** RocksDB flusher instance. */
+ private volatile RocksDbFlusher flusher;
+
/** RocksDB database. */
private volatile TransactionDB db;
@@ -69,41 +91,86 @@ public class TxStateRocksDbStorage implements
TxStateStorage, AutoCloseable {
/** Database options. */
private volatile TransactionDBOptions txDbOptions;
+ /** Write options. */
+ private final WriteOptions writeOptions = new
WriteOptions().setDisableWAL(true);
+
+ /** Read options for regular reads. */
+ private final ReadOptions readOptions = new ReadOptions();
+
+ /** Read options for reading persisted data. */
+ private final ReadOptions persistedTierReadOptions = new
ReadOptions().setReadTier(PERSISTED_TIER);
+
+ /** Scheduled pool for {@link RocksDbFlusher}. */
+ private final ScheduledExecutorService scheduledPool;
+
/** Thread-pool for snapshot operations execution. */
- private final ExecutorService snapshotExecutor;
+ private final ExecutorService threadPool;
+
+ /** Delay supplier for {@link RocksDbFlusher}. */
+ private final IntSupplier delaySupplier;
- /** Snapshot manager. */
- private volatile RocksSnapshotManager snapshotManager;
+ /** On-heap-cached last applied index value. */
+ private volatile long lastAppliedIndex;
- /** Snapshot restore lock. */
- private final Object snapshotRestoreLock = new Object();
+ /** The value of {@link #lastAppliedIndex} persisted to the device at this
moment. */
+ private volatile long persistedIndex;
/** Whether is started. */
private boolean isStarted;
+ /** Collection of opened RocksDB iterators. */
+ private final Set<RocksIterator> iterators = ConcurrentHashMap.newKeySet();
+
+ /** Busy lock to stop synchronously. */
+ final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* The constructor.
*
* @param dbPath Database path.
- * @param snapshotExecutor Snapshot thread pool.
+ * @param scheduledPool Scheduled thread pool.
+ * @param threadPool Thread pool.
+ * @param delaySupplier Supplier of delay values to batch independent
flush requests. Please refer to {@link RocksDbFlusher} for
+ * details.
*/
- public TxStateRocksDbStorage(Path dbPath, ExecutorService
snapshotExecutor) {
+ public TxStateRocksDbStorage(
+ Path dbPath,
+ ScheduledExecutorService scheduledPool,
+ ExecutorService threadPool,
+ IntSupplier delaySupplier
+ ) {
this.dbPath = dbPath;
- this.snapshotExecutor = snapshotExecutor;
+ this.threadPool = threadPool;
+ this.scheduledPool = scheduledPool;
+ this.delaySupplier = delaySupplier;
}
/** {@inheritDoc} */
@Override public void start() {
try {
- this.options = new Options().setCreateIfMissing(true);
+ flusher = new RocksDbFlusher(
+ busyLock,
+ scheduledPool,
+ threadPool,
+ delaySupplier,
+ this::refreshPersistedIndex
+ );
+
+ options = new Options()
+ .setCreateIfMissing(true)
+ .setAtomicFlush(true)
+ .setListeners(List.of(flusher.listener()));
- this.txDbOptions = new TransactionDBOptions();
+ txDbOptions = new TransactionDBOptions();
- this.db = TransactionDB.open(options, txDbOptions,
dbPath.toString());
+ db = TransactionDB.open(options, txDbOptions, dbPath.toString());
- ColumnFamily defaultCf = ColumnFamily.wrap(db,
db.getDefaultColumnFamily());
+ lastAppliedIndex = readLastAppliedIndex(readOptions);
- snapshotManager = new RocksSnapshotManager(db,
List.of(fullRange(defaultCf)), snapshotExecutor);
+ persistedIndex = lastAppliedIndex;
isStarted = true;
} catch (RocksDBException e) {
@@ -111,6 +178,36 @@ public class TxStateRocksDbStorage implements
TxStateStorage, AutoCloseable {
}
}
+ private void refreshPersistedIndex() {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ persistedIndex = readLastAppliedIndex(persistedTierReadOptions);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Reads the value of {@link #lastAppliedIndex} from the storage.
+ *
+ * @param readOptions Read options to be used for reading.
+ * @return The value of last applied index.
+ */
+ private long readLastAppliedIndex(ReadOptions readOptions) {
+ byte[] appliedIndexBytes;
+
+ try {
+ appliedIndexBytes = db.get(readOptions, APPLIED_INDEX_KEY);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
read applied index value from transaction state storage", e);
+ }
+
+ return appliedIndexBytes == null ? 0 : bytesToLong(appliedIndexBytes);
+ }
+
/** {@inheritDoc} */
@Override public boolean isStarted() {
return isStarted;
@@ -118,7 +215,22 @@ public class TxStateRocksDbStorage implements
TxStateStorage, AutoCloseable {
/** {@inheritDoc} */
@Override public void stop() throws Exception {
- IgniteUtils.closeAll(options, txDbOptions, db);
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ List<AutoCloseable> closeables = new ArrayList<>(iterators);
+
+ closeables.add(persistedTierReadOptions);
+ closeables.add(readOptions);
+ closeables.add(writeOptions);
+ closeables.add(options);
+ closeables.add(txDbOptions);
+ closeables.add(db);
+
+ IgniteUtils.closeAll(closeables);
db = null;
options = null;
@@ -127,81 +239,157 @@ public class TxStateRocksDbStorage implements
TxStateStorage, AutoCloseable {
isStarted = false;
}
+ @Override
+ public CompletableFuture<Void> flush() {
+ return flusher.awaitFlush(true);
+ }
+
+ @Override
+ public long lastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ @Override
+ public long persistedIndex() {
+ return persistedIndex;
+ }
+
/** {@inheritDoc} */
@Override public TxMeta get(UUID txId) {
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
try {
- byte[] txMetaBytes = db.get(toBytes(txId));
+ byte[] txMetaBytes = db.get(uuidToBytes(txId));
return txMetaBytes == null ? null : (TxMeta)
fromBytes(txMetaBytes);
} catch (RocksDBException e) {
throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
get a value from the transaction state storage", e);
+ } finally {
+ busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override public void put(UUID txId, TxMeta txMeta) {
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
try {
- db.put(toBytes(txId), toBytes(txMeta));
+ db.put(uuidToBytes(txId), toBytes(txMeta));
} catch (RocksDBException e) {
throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
put a value into the transaction state storage", e);
+ } finally {
+ busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
- @Override public boolean compareAndSet(UUID txId, @NotNull TxState
txStateExpected, @NotNull TxMeta txMeta) {
+ @Override public boolean compareAndSet(UUID txId, TxState txStateExpected,
TxMeta txMeta, long commandIndex) {
requireNonNull(txStateExpected);
requireNonNull(txMeta);
- byte[] txIdBytes = toBytes(txId);
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
+ byte[] txIdBytes = uuidToBytes(txId);
- try (Transaction rocksTx = db.beginTransaction(new WriteOptions())) {
- byte[] txMetaExistingBytes = rocksTx.get(new ReadOptions(),
toBytes(txId));
- TxMeta txMetaExisting = (TxMeta) fromBytes(txMetaExistingBytes);
+ try (Transaction rocksTx = db.beginTransaction(writeOptions)) {
+ byte[] txMetaExistingBytes = rocksTx.get(readOptions,
uuidToBytes(txId));
+ TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
+
+ boolean result;
if (txMetaExisting.txState() == txStateExpected) {
rocksTx.put(txIdBytes, toBytes(txMeta));
- rocksTx.commit();
-
- return true;
+ result = true;
} else {
- rocksTx.rollback();
-
- return false;
+ result = false;
}
+
+ rocksTx.put(APPLIED_INDEX_KEY, longToBytes(commandIndex));
+
+ rocksTx.commit();
+
+ return result;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
"Failed perform CAS operation over a value in transaction
state storage",
e
);
+ } finally {
+ busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override public void remove(UUID txId) {
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
try {
- db.delete(toBytes(txId));
+ db.delete(uuidToBytes(txId));
} catch (RocksDBException e) {
throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
remove a value from the transaction state storage", e);
+ } finally {
+ busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
- RocksIterator rocksIterator = db.newIterator();
- rocksIterator.seekToFirst();
+ if (!busyLock.enterBusy()) {
+ throwStorageStoppedException();
+ }
+
+ try {
+ RocksIterator rocksIterator = db.newIterator();
- RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter =
new RocksIteratorAdapter<>(rocksIterator) {
- @Override protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[]
keyBytes, byte[] valueBytes) {
- UUID key = (UUID) fromBytes(keyBytes);
- TxMeta txMeta = (TxMeta) fromBytes(valueBytes);
+ iterators.add(rocksIterator);
- return new IgniteBiTuple<>(key, txMeta);
+ try {
+ // Skip applied index value.
+ rocksIterator.seek(new byte[1]);
+ } catch (Exception e) {
+ // Unlikely, but what if...
+ iterators.remove(rocksIterator);
+
+ rocksIterator.close();
+
+ throw e;
}
- };
- return Cursor.fromIterator(iteratorAdapter);
+ RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter
= new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+ @Override protected IgniteBiTuple<UUID, TxMeta>
decodeEntry(byte[] keyBytes, byte[] valueBytes) {
+ UUID key = bytesToUuid(keyBytes);
+ TxMeta txMeta = fromBytes(valueBytes);
+
+ return new IgniteBiTuple<>(key, txMeta);
+ }
+
+ @Override
+ protected void handleBusy() {
+ throwStorageStoppedException();
+ }
+
+ @Override
+ public void close() throws Exception {
+ iterators.remove(rocksIterator);
+
+ super.close();
+ }
+ };
+
+ return Cursor.fromIterator(iteratorAdapter);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -215,20 +403,22 @@ public class TxStateRocksDbStorage implements
TxStateStorage, AutoCloseable {
}
}
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return snapshotManager.createSnapshot(snapshotPath);
+ private static void throwStorageStoppedException() {
+ throw new IgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR,
"Transaction state storage is stopped");
}
- /** {@inheritDoc} */
- @Override public void restoreSnapshot(Path snapshotPath) {
- synchronized (snapshotRestoreLock) {
- destroy();
+ private byte[] uuidToBytes(UUID uuid) {
+ return ByteBuffer.allocate(2 * Long.BYTES).order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.getMostSignificantBits())
+ .putLong(uuid.getLeastSignificantBits())
+ .array();
+ }
- start();
+ private UUID bytesToUuid(byte[] bytes) {
+ long msb = bytesToLong(bytes, 0);
+ long lsb = bytesToLong(bytes, Long.BYTES);
- snapshotManager.restoreSnapshot(snapshotPath);
- }
+ return new UUID(msb, lsb);
}
/** {@inheritDoc} */
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
index 5506960e6f..a3dc9c10cc 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.tx.storage.state;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -24,16 +26,12 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.io.File;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -124,8 +122,8 @@ public class TxStateStorageTest {
assertTxMetaEquals(storage.get(txId), txMeta0);
- assertFalse(storage.compareAndSet(txId, txMeta1.txState(),
txMeta2));
- assertTrue(storage.compareAndSet(txId, txMeta0.txState(),
txMeta2));
+ assertFalse(storage.compareAndSet(txId, txMeta1.txState(),
txMeta2, 1));
+ assertTrue(storage.compareAndSet(txId, txMeta0.txState(), txMeta2,
2));
assertTxMetaEquals(storage.get(txId), txMeta2);
}
@@ -143,6 +141,7 @@ public class TxStateStorageTest {
TxMeta txMeta = new TxMeta(TxState.PENDING,
generateEnlistedPartitions(i), generateTimestamp(txId));
txs.put(txId, txMeta);
storage.put(txId, txMeta);
+ storage.compareAndSet(txId, TxState.PENDING, txMeta, i);
}
try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor =
storage.scan()) {
@@ -163,65 +162,6 @@ public class TxStateStorageTest {
}
}
- @Test
- public void testSnapshot() throws Exception {
- try (TxStateStorage storage = createStorage()) {
- storage.start();
-
- List<UUID> inSnapshot = new ArrayList<>();
-
- for (int i = 0; i < 100; i++) {
- UUID txId = UUID.randomUUID();
-
- storage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)));
-
- inSnapshot.add(txId);
- }
-
- Path snapshotDirPath = workDir.resolve("snapshot");
- Files.createDirectories(snapshotDirPath);
-
- try {
- storage.snapshot(snapshotDirPath).join();
-
- List<UUID> notInSnapshot = new ArrayList<>();
-
- for (int i = 0; i < 100; i++) {
- UUID txId = UUID.randomUUID();
-
- storage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)));
-
- notInSnapshot.add(txId);
- }
-
- for (int i = 0; i < 100; i++) {
- UUID txId = notInSnapshot.get(i);
-
- assertTxMetaEquals(new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
- }
-
- storage.restoreSnapshot(snapshotDirPath);
-
- for (int i = 0; i < 100; i++) {
- UUID txId = inSnapshot.get(i);
-
- assertTxMetaEquals(new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
- }
-
- for (int i = 0; i < 100; i++) {
- UUID txId = notInSnapshot.get(i);
-
- assertNull(storage.get(txId));
- }
- } finally {
- Files.walk(snapshotDirPath)
- .sorted(Comparator.reverseOrder())
- .map(Path::toFile)
- .forEach(File::delete);
- }
- }
- }
-
private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
assertEquals(txMeta0.txState(), txMeta1.txState());
assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
@@ -229,6 +169,6 @@ public class TxStateStorageTest {
}
private TxStateStorage createStorage() {
- return new TxStateRocksDbStorage(workDir,
Executors.newSingleThreadExecutor());
+ return new TxStateRocksDbStorage(workDir,
newSingleThreadScheduledExecutor(), newSingleThreadExecutor(), () -> 100);
}
}