This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-25904 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 464e5e782b194b76fe96aa182e08f6d3d424653a Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Jul 15 12:22:15 2025 +0300 IGNITE-25904 wip --- .../rebalance/ItRebalanceDistributedTest.java | 1 + .../server/persistence/RocksDbKeyValueStorage.java | 5 ++++ .../partition/replicator/fixtures/Node.java | 1 + .../ItZonePartitionRaftListenerRecoveryTest.java | 1 + .../replicator/ZoneResourcesManagerTest.java | 10 ++++++- .../storage/impl/DefaultLogStorageFactory.java | 6 ++++- .../rocksdb/LoggingRocksDbFlushListener.java | 31 ++++++++++++++++------ .../rocksdb/flush/RocksDbFlushListener.java | 5 ++-- .../internal/rocksdb/flush/RocksDbFlusher.java | 4 ++- .../internal/rocksdb/flush/RocksDbFlusherTest.java | 1 + .../runner/app/ItIgniteNodeRestartTest.java | 1 + .../org/apache/ignite/internal/app/IgniteImpl.java | 1 + .../storage/rocksdb/RocksDbStorageEngine.java | 5 +++- .../instance/SharedRocksDbInstanceCreator.java | 6 ++++- .../instance/SharedRocksDbInstanceTest.java | 6 +++-- .../distributed/TableManagerRecoveryTest.java | 1 + .../table/distributed/TableManagerTest.java | 1 + .../state/rocksdb/TxStateRocksDbSharedStorage.java | 10 ++++++- .../RocksDbTxStatePartitionStorageTest.java | 1 + .../TxStateMetaRocksDbPartitionStorageTest.java | 1 + 20 files changed, 80 insertions(+), 18 deletions(-) diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index a6b17cf0b21..8e49a2550c9 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1574,6 +1574,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { MinimumRequiredTimeCollectorService minTimeCollectorService = new MinimumRequiredTimeCollectorServiceImpl(); sharedTxStateStorage = new TxStateRocksDbSharedStorage( + name, storagePath.resolve("tx-state"), threadPoolsManager.commonScheduler(), threadPoolsManager.tableIoExecutor(), diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index 56672807692..53fa1275b59 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -218,6 +218,9 @@ public class RocksDbKeyValueStorage extends AbstractKeyValueStorage { /** Path to the rocksdb database. */ private final Path dbPath; + /** Node name. */ + private final String nodeName; + /** RockDB options. */ private volatile DBOptions options; @@ -308,6 +311,7 @@ public class RocksDbKeyValueStorage extends AbstractKeyValueStorage { this.dbPath = dbPath; this.scheduledExecutor = scheduledExecutor; + this.nodeName = nodeName; executor = Executors.newFixedThreadPool( 2, @@ -400,6 +404,7 @@ public class RocksDbKeyValueStorage extends AbstractKeyValueStorage { flusher = new RocksDbFlusher( "rocksdb metastorage kv storage", + nodeName, busyLock, scheduledExecutor, executor, diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 28be8051d00..45c81be29ef 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -713,6 +713,7 @@ public class Node { ); sharedTxStateStorage = new TxStateRocksDbSharedStorage( + name, storagePath.resolve("tx-state"), threadPoolsManager.commonScheduler(), threadPoolsManager.tableIoExecutor(), diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java index 738c80b8be1..67f627fc167 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java @@ -270,6 +270,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends IgniteAbstractTest { components.add(raftManager); var sharedRockDbStorage = new TxStateRocksDbSharedStorage( + clusterService.nodeName(), workDir.resolve("tx"), scheduledExecutorService, executor, diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java index aea5c1c25ea..991dda265ef 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java @@ -75,7 +75,15 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest { @InjectExecutorService ScheduledExecutorService scheduler, @InjectExecutorService ExecutorService executor ) { - sharedStorage = new TxStateRocksDbSharedStorage(workDir, scheduler, executor, logSyncer, mock(FailureProcessor.class), () -> 0); + sharedStorage = new TxStateRocksDbSharedStorage( + "test", + workDir, + scheduler, + executor, + logSyncer, + mock(FailureProcessor.class), + () -> 0 + ); manager = new ZoneResourcesManager( sharedStorage, diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java index 45f98c47f34..045240c9793 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java @@ -68,6 +68,9 @@ public class DefaultLogStorageFactory implements LogStorageFactory { /** Name of the log factory, will be used in logs. */ private final String factoryName; + /** Node name. */ + private final String nodeName; + /** Path to the log storage. */ private final Path logPath; @@ -126,6 +129,7 @@ public class DefaultLogStorageFactory implements LogStorageFactory { this.factoryName = factoryName; this.logPath = logPath; this.fsync = fsync; + this.nodeName = nodeName; executorService = Executors.newSingleThreadExecutor( IgniteThreadFactory.create(nodeName, "raft-shared-log-storage-pool", LOG) @@ -159,7 +163,7 @@ public class DefaultLogStorageFactory implements LogStorageFactory { this.cfOption = createColumnFamilyOptions(); - this.flushListener = new LoggingRocksDbFlushListener(factoryName); + this.flushListener = new LoggingRocksDbFlushListener(factoryName, nodeName); List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of( // Column family to store configuration log entry. diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java index 118a3e5f2cf..472c23e22da 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.rocksdb; +import static java.util.stream.Collectors.toList; import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_BEGIN; import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_COMPLETED; import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BEGIN; @@ -25,7 +26,6 @@ import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_CO import java.nio.file.Paths; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.rocksdb.AbstractEventListener; @@ -43,6 +43,9 @@ public class LoggingRocksDbFlushListener extends AbstractEventListener { /** Listener name, for logs. */ private final String name; + /** Node name, for logs. */ + private final String nodeName; + /** * Type of last processed flush event. Real amount of events doesn't matter in atomic flush mode. All "completed" events go after all * "begin" events, and vice versa. @@ -62,18 +65,23 @@ public class LoggingRocksDbFlushListener extends AbstractEventListener { * Constructor. * * @param name Listener name, for logs. + * @param nodeName Node name, for logs. */ - public LoggingRocksDbFlushListener(String name) { + public LoggingRocksDbFlushListener(String name, String nodeName) { super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED, ON_COMPACTION_BEGIN, ON_COMPACTION_COMPLETED); this.name = name; + this.nodeName = nodeName; } @Override public void onFlushBegin(RocksDB db, FlushJobInfo flushJobInfo) { if (lastFlushEventType.compareAndSet(ON_FLUSH_COMPLETED, ON_FLUSH_BEGIN)) { if (LOG.isInfoEnabled()) { - LOG.info("Starting rocksdb flush process [name='{}', reason={}]", name, flushJobInfo.getFlushReason()); + LOG.info( + "Starting rocksdb flush process [name='{}', nodeName='{}', reason={}]", + name, nodeName, flushJobInfo.getFlushReason() + ); lastFlushStartTimeNanos = System.nanoTime(); } @@ -88,7 +96,10 @@ public class LoggingRocksDbFlushListener extends AbstractEventListener { if (LOG.isInfoEnabled()) { long duration = System.nanoTime() - lastFlushStartTimeNanos; - LOG.info("Finishing rocksdb flush process [name='{}', duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration)); + LOG.info( + "Finishing rocksdb flush process [name='{}', nodeName='{}', duration={}ms]", + name, nodeName, TimeUnit.NANOSECONDS.toMillis(duration) + ); } onFlushCompletedCallback(db, flushJobInfo); @@ -107,12 +118,13 @@ public class LoggingRocksDbFlushListener extends AbstractEventListener { public void onCompactionBegin(RocksDB db, CompactionJobInfo compactionJobInfo) { if (lastCompactionEventType.compareAndSet(ON_COMPACTION_COMPLETED, ON_COMPACTION_BEGIN)) { if (LOG.isInfoEnabled()) { - LOG.info("Starting rocksdb compaction process [name='{}', reason={}, input={}, output={}]", + LOG.info("Starting rocksdb compaction process [name='{}', nodeName='{}', reason={}, input={}, output={}]", name, + nodeName, compactionJobInfo.compactionReason(), // Extract file names from full paths. - compactionJobInfo.inputFiles().stream().map(path -> Paths.get(path).getFileName()).collect(Collectors.toList()), - compactionJobInfo.outputFiles().stream().map(path -> Paths.get(path).getFileName()).collect(Collectors.toList()) + compactionJobInfo.inputFiles().stream().map(path -> Paths.get(path).getFileName()).collect(toList()), + compactionJobInfo.outputFiles().stream().map(path -> Paths.get(path).getFileName()).collect(toList()) ); lastCompactionStartTimeNanos = System.nanoTime(); @@ -126,7 +138,10 @@ public class LoggingRocksDbFlushListener extends AbstractEventListener { if (LOG.isInfoEnabled()) { long duration = System.nanoTime() - lastCompactionStartTimeNanos; - LOG.info("Finishing rocksdb compaction process [name='{}', duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration)); + LOG.info( + "Finishing rocksdb compaction process [name='{}', nodeName='{}', duration={}ms]", + name, nodeName, TimeUnit.NANOSECONDS.toMillis(duration) + ); } } } diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java index 4a04981eaa6..0d46f6bb408 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java @@ -48,11 +48,12 @@ class RocksDbFlushListener extends LoggingRocksDbFlushListener { * Constructor. * * @param name Listener name, for logs. + * @param nodeName Node name, for logs. * @param flusher Flusher instance to delegate events processing to. * @param logSyncer Write-ahead log synchronizer. */ - RocksDbFlushListener(String name, RocksDbFlusher flusher, LogSyncer logSyncer, FailureProcessor failureProcessor) { - super(name); + RocksDbFlushListener(String name, String nodeName, RocksDbFlusher flusher, LogSyncer logSyncer, FailureProcessor failureProcessor) { + super(name, nodeName); this.flusher = flusher; this.logSyncer = logSyncer; diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java index 39e877feae7..d50b93f9bce 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java @@ -106,6 +106,7 @@ public class RocksDbFlusher { * Constructor. * * @param name RocksDB instance name, for logging purposes. + * @param nodeName nodeName Node name. * @param busyLock Busy lock. * @param scheduledPool Scheduled pool the schedule flushes. * @param threadPool Thread pool to execute flush and to run flush completion closure, provided by {@code onFlushCompleted} parameter. @@ -120,6 +121,7 @@ public class RocksDbFlusher { */ public RocksDbFlusher( String name, + String nodeName, IgniteSpinBusyLock busyLock, ScheduledExecutorService scheduledPool, Executor threadPool, @@ -134,7 +136,7 @@ public class RocksDbFlusher { this.delaySupplier = delaySupplier; this.onFlushCompleted = onFlushCompleted; this.failureProcessor = failureProcessor; - this.flushListener = new RocksDbFlushListener(name, this, logSyncer, failureProcessor); + this.flushListener = new RocksDbFlushListener(name, nodeName, this, logSyncer, failureProcessor); } /** diff --git a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java index 74a00e9b0a1..cee3307f6f9 100644 --- a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java +++ b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java @@ -71,6 +71,7 @@ class RocksDbFlusherTest extends IgniteAbstractTest { flusher = new RocksDbFlusher( "RocksDbFlusherTest", + "test", new IgniteSpinBusyLock(), sameThreadExecutor, Runnable::run, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 88391dfb7b2..26e675e1232 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -734,6 +734,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { MinimumRequiredTimeCollectorService minTimeCollectorService = new MinimumRequiredTimeCollectorServiceImpl(); var sharedTxStateStorage = new TxStateRocksDbSharedStorage( + name, storagePath.resolve("tx-state"), threadPoolsManager.commonScheduler(), threadPoolsManager.tableIoExecutor(), diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 252f09c3dd2..561e12aa78e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -1045,6 +1045,7 @@ public class IgniteImpl implements Ignite { ); sharedTxStateStorage = new TxStateRocksDbSharedStorage( + name, storagePath.resolve(TX_STATE_DIR), threadPoolsManager.commonScheduler(), threadPoolsManager.tableIoExecutor(), diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java index 508d4b7b0ae..9d74d2e5b59 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java @@ -96,6 +96,8 @@ public class RocksDbStorageEngine implements StorageEngine { private final FailureProcessor failureProcessor; + private final String nodeName; + /** * Mapping from the storage profile name to the shared RocksDB instance. */ @@ -127,6 +129,7 @@ public class RocksDbStorageEngine implements StorageEngine { this.logSyncer = logSyncer; this.scheduledPool = scheduledPool; this.failureProcessor = failureProcessor; + this.nodeName = nodeName; threadPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), @@ -223,7 +226,7 @@ public class RocksDbStorageEngine implements StorageEngine { Path dbPath = storagePath.resolve("rocksdb-" + profileName); try { - return new SharedRocksDbInstanceCreator(failureProcessor).create(this, profile, dbPath); + return new SharedRocksDbInstanceCreator(failureProcessor, nodeName).create(this, profile, dbPath); } catch (Exception e) { throw new StorageException("Failed to create new RocksDB instance", e); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java index b022a5c7c0b..7b81cd12540 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java @@ -58,11 +58,14 @@ import org.rocksdb.RocksDBException; public class SharedRocksDbInstanceCreator { private final FailureProcessor failureProcessor; + private final String nodeName; + /** List of resources that must be closed if DB creation failed in the process. */ private final List<AutoCloseable> resources = new ArrayList<>(); - public SharedRocksDbInstanceCreator(FailureProcessor failureProcessor) { + public SharedRocksDbInstanceCreator(FailureProcessor failureProcessor, String nodeName) { this.failureProcessor = failureProcessor; + this.nodeName = nodeName; } /** @@ -80,6 +83,7 @@ public class SharedRocksDbInstanceCreator { var flusher = new RocksDbFlusher( "rocksdb storage profile [" + profile.name() + "]", + nodeName, busyLock, engine.scheduledPool(), engine.threadPool(), diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java index bb96a12c67a..df13504456e 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java @@ -72,6 +72,8 @@ import org.rocksdb.RocksDBException; @ExtendWith(ExecutorServiceExtension.class) @ExtendWith(ConfigurationExtension.class) class SharedRocksDbInstanceTest extends IgniteAbstractTest { + private static final String NODE_NAME = "test"; + private RocksDbStorageEngine engine; private RocksDbStorageProfile storageProfile; @@ -87,7 +89,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { ScheduledExecutorService scheduledExecutor ) throws Exception { engine = new RocksDbStorageEngine( - "test", + NODE_NAME, storageConfiguration, workDir, mock(LogSyncer.class), @@ -116,7 +118,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { } private SharedRocksDbInstance createDb() throws Exception { - return new SharedRocksDbInstanceCreator(mock(FailureProcessor.class)).create(engine, storageProfile, workDir); + return new SharedRocksDbInstanceCreator(mock(FailureProcessor.class), NODE_NAME).create(engine, storageProfile, workDir); } @Test diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index 73f9af4beba..d79f253d179 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -419,6 +419,7 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { FailureProcessor failureProcessor = mock(FailureProcessor.class); sharedTxStateStorage = new TxStateRocksDbSharedStorage( + node.name(), workDir.resolve("tx-state"), scheduledExecutor, partitionOperationsExecutor, diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 175d814a51e..7f903146df0 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -873,6 +873,7 @@ public class TableManagerTest extends IgniteAbstractTest { var failureProcessor = new NoOpFailureManager(); var sharedTxStateStorage = new TxStateRocksDbSharedStorage( + NODE_NAME, workDir.resolve("tx-state"), scheduledExecutor, partitionOperationsExecutor, diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java index bd8851de00a..1e17c1ed4fb 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java @@ -105,6 +105,8 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { private final FailureProcessor failureProcessor; + private final String nodeName; + private volatile ColumnFamily txStateColumnFamily; private volatile ColumnFamily txStateMetaColumnFamily; @@ -112,6 +114,7 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { /** * Constructor. * + * @param nodeName Node name. * @param dbPath Database path. * @param scheduledExecutor Scheduled executor. Needed only for asynchronous start of scheduled operations without performing * blocking, long or IO operations. @@ -121,18 +124,20 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { * @see RocksDbFlusher */ public TxStateRocksDbSharedStorage( + String nodeName, Path dbPath, ScheduledExecutorService scheduledExecutor, ExecutorService threadPool, LogSyncer logSyncer, FailureProcessor failureProcessor ) { - this(dbPath, scheduledExecutor, threadPool, logSyncer, failureProcessor, TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER); + this(nodeName, dbPath, scheduledExecutor, threadPool, logSyncer, failureProcessor, TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER); } /** * Constructor. * + * @param nodeName Node name. * @param dbPath Database path. * @param scheduledExecutor Scheduled executor. Needed only for asynchronous start of scheduled operations without performing * blocking, long or IO operations. @@ -143,6 +148,7 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { * @see RocksDbFlusher */ public TxStateRocksDbSharedStorage( + String nodeName, Path dbPath, ScheduledExecutorService scheduledExecutor, ExecutorService threadPool, @@ -156,6 +162,7 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { this.flushDelaySupplier = flushDelaySupplier; this.logSyncer = logSyncer; this.failureProcessor = failureProcessor; + this.nodeName = nodeName; } /** @@ -190,6 +197,7 @@ public class TxStateRocksDbSharedStorage implements IgniteComponent { flusher = new RocksDbFlusher( "tx state storage", + nodeName, busyLock, scheduledExecutor, threadPool, diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java index 2c1fd629b25..cd241dd95b3 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java @@ -73,6 +73,7 @@ public class RocksDbTxStatePartitionStorageTest extends AbstractTxStatePartition @BeforeEach protected void beforeTest() { sharedStorage = new TxStateRocksDbSharedStorage( + "test", workDir, scheduledExecutor, executor, diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorageTest.java index 3589aea54e2..0bf7b92b87d 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorageTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorageTest.java @@ -55,6 +55,7 @@ class TxStateMetaRocksDbPartitionStorageTest extends IgniteAbstractTest { @InjectExecutorService ExecutorService executor ) { sharedStorage = new TxStateRocksDbSharedStorage( + "test", workDir, scheduledExecutor, executor,
