This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 46a4462d45 IGNITE-23549 Flush Metastorage underlying storage to disk
after taking snapshot (#4679)
46a4462d45 is described below
commit 46a4462d45175f65d299b60f9662c64bb93ad507
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 6 11:36:55 2024 +0300
IGNITE-23549 Flush Metastorage underlying storage to disk after taking
snapshot (#4679)
---
.../ignite/internal/components/LogSyncer.java | 1 +
.../{LogSyncer.java => NoOpLogSyncer.java} | 14 ++--
.../metastorage/server/KeyValueStorage.java | 7 ++
.../server/persistence/RocksDbKeyValueStorage.java | 74 ++++++++++++++++++++--
.../server/RocksDbKeyValueStorageTest.java | 15 +++++
.../server/SimpleInMemoryKeyValueStorage.java | 5 ++
.../replicator/ItReplicaLifecycleTest.java | 7 +-
.../internal/rocksdb/flush/RocksDbFlusher.java | 16 ++++-
.../runner/app/ItIgniteNodeRestartTest.java | 7 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 16 ++---
.../rebalance/ItRebalanceDistributedTest.java | 7 +-
11 files changed, 127 insertions(+), 42 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
index 612e6244b2..98f450e689 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.components;
/** Interface to synchronize write-ahead log. Operates only for persistent log
storages. */
+@FunctionalInterface
public interface LogSyncer {
/**
* Synchronizes write-ahead log.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
b/modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
similarity index 75%
copy from
modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
index 612e6244b2..617450c3aa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
@@ -17,12 +17,10 @@
package org.apache.ignite.internal.components;
-/** Interface to synchronize write-ahead log. Operates only for persistent log
storages. */
-public interface LogSyncer {
- /**
- * Synchronizes write-ahead log.
- *
- * @throws Exception if an error occurs whilst syncing.
- */
- void sync() throws Exception;
+/** Implementation that does nothing. */
+public class NoOpLogSyncer implements LogSyncer {
+ @Override
+ public void sync() {
+ // No-op.
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ec580af62d..3ff427de74 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -589,4 +589,11 @@ public interface KeyValueStorage extends ManuallyCloseable
{
* @see #getCompactionRevision()
*/
Revisions revisions();
+
+ /**
+ * Flushes current state of the data or <i>the state from the nearest
future</i> to the storage.
+ *
+ * @return Future that's completed when flushing of the data is completed.
+ */
+ CompletableFuture<Void> flush();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d6a30e2dd8..810e8c5401 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -46,6 +46,8 @@ import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWrit
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
@@ -68,8 +70,11 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.components.NoOpLogSyncer;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.CommandId;
@@ -99,11 +104,13 @@ import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -174,12 +181,18 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/** Batch size (number of keys) for storage compaction. The value is
arbitrary. */
private static final int COMPACT_BATCH_SIZE = 10;
+ /** Key value storage flush delay in mills. Value is taken from the
example of default values of other components. */
+ private static final int KV_STORAGE_FLUSH_DELAY = 100;
+
static {
RocksDB.loadLibrary();
}
- /** Thread-pool for snapshot operations execution. */
- private final ExecutorService snapshotExecutor;
+ /** Executor for storage operations. */
+ private final ExecutorService executor;
+
+ /** Scheduled executor for storage operations. */
+ private final ScheduledExecutorService scheduledExecutor;
/** Path to the rocksdb database. */
private final Path dbPath;
@@ -258,6 +271,13 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
*/
private WriteOptions checksumWriteOptions;
+ /** Multi-threaded access is guarded by {@link #rwLock}. */
+ private RocksDbFlusher flusher;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -280,11 +300,23 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
this.dbPath = dbPath;
- snapshotExecutor = Executors.newFixedThreadPool(2,
NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", log));
+ executor = Executors.newFixedThreadPool(
+ 2,
+ NamedThreadFactory.create(nodeName,
"metastorage-rocksdb-kv-storage-executor", log)
+ );
+
+ // TODO: IGNITE-23615 Use a common pool, e.g.
ThreadPoolsManager#commonScheduler
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName,
"metastorage-rocksdb-kv-storage-scheduler", log)
+ );
}
@Override
public void start() {
+ inBusyLock(busyLock, this::startBusy);
+ }
+
+ private void startBusy() {
rwLock.writeLock().lock();
try {
@@ -293,6 +325,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
createDb();
} catch (IOException | RocksDBException e) {
closeRocksResources();
+
throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to
start the storage", e);
} finally {
rwLock.writeLock().unlock();
@@ -346,6 +379,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
DBOptions options = new DBOptions()
.setAtomicFlush(true)
.setCreateMissingColumnFamilies(true)
+ .setListeners(List.of(flusher.listener()))
.setCreateIfMissing(true);
rocksResources.add(options);
@@ -368,6 +402,17 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
+ flusher = new RocksDbFlusher(
+ "rocksdb metastorage kv storage",
+ busyLock,
+ scheduledExecutor,
+ executor,
+ () -> KV_STORAGE_FLUSH_DELAY,
+ // It is expected that the metastorage command raft log works
with fsync=true.
+ new NoOpLogSyncer(),
+ () -> {}
+ );
+
options = createDbOptions();
db = RocksDB.open(options, dbPath.toAbsolutePath().toString(),
descriptors, handles);
@@ -386,9 +431,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
snapshotManager = new RocksSnapshotManager(db,
List.of(fullRange(data), fullRange(index),
fullRange(tsToRevision), fullRange(revisionToTs),
fullRange(revisionToChecksum)),
- snapshotExecutor
+ executor
);
+ flusher.init(db, handles);
+
byte[] revision = data.get(REVISION_KEY);
if (revision != null) {
@@ -438,11 +485,19 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
stopCompaction();
+ busyLock.block();
+
watchProcessor.close();
+ flusher.stop();
- IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10,
TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10,
TimeUnit.SECONDS);
rwLock.writeLock().lock();
try {
@@ -463,7 +518,9 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
rwLock.writeLock().lock();
try {
- return snapshotManager.createSnapshot(snapshotPath);
+ return snapshotManager
+ .createSnapshot(snapshotPath)
+ .thenCompose(unused -> flush());
} finally {
rwLock.writeLock().unlock();
}
@@ -1601,4 +1658,9 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
private void addIndexAndTermToWriteBatch(WriteBatch batch,
KeyValueUpdateContext context) throws RocksDBException {
data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index,
context.term));
}
+
+ @Override
+ public CompletableFuture<Void> flush() {
+ return inBusyLockAsync(busyLock, () -> flusher.awaitFlush(true));
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index b40028ed13..0f0d7b5320 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type.NOT_EXISTS;
import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -390,4 +391,18 @@ public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
assertThat(checksumAndRevisions.minChecksummedRevision(), is(2L));
assertThat(checksumAndRevisions.maxChecksummedRevision(), is(2L));
}
+
+ @Test
+ void testFlush() throws Exception {
+ byte[] key = key(1);
+ byte[] value = keyValue(1, 1);
+
+ putToMs(key, value);
+
+ assertThat(storage.flush(), willCompleteSuccessfully());
+
+ restartStorage();
+
+ assertArrayEquals(value, storage.get(key).value());
+ }
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 78955c6d81..4d7e306981 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -916,4 +916,9 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
};
}
+
+ @Override
+ public CompletableFuture<Void> flush() {
+ return nullCompletedFuture();
+ }
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 79e9048505..aedfe8b895 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -96,7 +96,6 @@ import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ClusterConfiguration;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -1194,8 +1193,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
Path storagePath = dir.resolve("storage");
- LogSyncer logSyncer = partitionsLogStorageFactory;
-
dataStorageMgr = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
@@ -1203,7 +1200,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
dir.resolve("storage"),
null,
failureManager,
- logSyncer,
+ partitionsLogStorageFactory,
hybridClock
),
storageConfiguration
@@ -1331,7 +1328,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer,
+ partitionsLogStorageFactory,
partitionReplicaLifecycleManager,
minTimeCollectorService
) {
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 40cf25ea72..e4a1b8c4d2 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.rocksdb.flush;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
+import java.util.function.Supplier;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -279,6 +281,18 @@ public class RocksDbFlusher {
* @return Future that completes when the {@code onFlushCompleted}
callback finishes.
*/
CompletableFuture<Void> onFlushCompleted() {
- return CompletableFuture.runAsync(onFlushCompleted, threadPool);
+ return inBusyLockSafeAsync(() -> runAsync(onFlushCompleted,
threadPool));
+ }
+
+ private CompletableFuture<Void>
inBusyLockSafeAsync(Supplier<CompletableFuture<Void>> supplier) {
+ if (!busyLock.enterBusy()) {
+ return nullCompletedFuture();
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 2da26a44a1..dd4d023b6b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -105,7 +105,6 @@ import
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateSto
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -639,8 +638,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
Path storagePath = getPartitionsStorePath(dir);
- LogSyncer logSyncer = partitionsLogStorageFactory;
-
DataStorageManager dataStorageManager = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
@@ -648,7 +645,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
storagePath,
null,
failureProcessor,
- logSyncer,
+ partitionsLogStorageFactory,
hybridClock
),
storageConfiguration
@@ -729,7 +726,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer,
+ partitionsLogStorageFactory,
new PartitionReplicaLifecycleManager(
catalogManager,
replicaMgr,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4a9624d5f1..9c8564fd28 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -88,7 +88,6 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.compute.AntiHijackIgniteCompute;
import org.apache.ignite.internal.compute.ComputeComponent;
@@ -584,13 +583,11 @@ public class IgniteImpl implements Ignite {
ComponentWorkingDir partitionsWorkDir =
partitionsPath(systemConfiguration, workDir);
- boolean raftUseFsync = raftConfiguration.fsync().value();
-
partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(
"table data log",
clusterSvc.nodeName(),
partitionsWorkDir.raftLogPath(),
- raftUseFsync
+ raftConfiguration.fsync().value()
);
raftMgr = new Loza(
@@ -682,6 +679,7 @@ public class IgniteImpl implements Ignite {
"meta-storage log",
clusterSvc.nodeName(),
metastorageWorkDir.raftLogPath(),
+ // If it changes, then it will be necessary to set LogSyncer
to RocksDbKeyValueStorage.
true
);
@@ -832,19 +830,13 @@ public class IgniteImpl implements Ignite {
GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcExtensionConfiguration.KEY).gc();
- LogSyncer logSyncer = () -> {
- partitionsLogStorageFactory.sync();
- cmgLogStorageFactory.sync();
- msLogStorageFactory.sync();
- };
-
Map<String, StorageEngine> storageEngines =
dataStorageModules.createStorageEngines(
name,
nodeConfigRegistry,
storagePath,
longJvmPauseDetector,
failureManager,
- logSyncer,
+ partitionsLogStorageFactory,
clock
);
@@ -1012,7 +1004,7 @@ public class IgniteImpl implements Ignite {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer,
+ partitionsLogStorageFactory,
partitionReplicaLifecycleManager,
minTimeCollectorService
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 73139386b1..3f6e67d09a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -110,7 +110,6 @@ import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ClusterConfiguration;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -1313,8 +1312,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Path storagePath = dir.resolve("storage");
- LogSyncer logSyncer = logStorageFactory;
-
dataStorageMgr = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
@@ -1322,7 +1319,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
dir.resolve("storage"),
null,
failureManager,
- logSyncer,
+ logStorageFactory,
hybridClock
),
storageConfiguration
@@ -1437,7 +1434,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer,
+ logStorageFactory,
new PartitionReplicaLifecycleManager(
catalogManager,
replicaManager,