This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b1afaa6b869 IGNITE-27149 Add additional logging for incoming snapshot
(#7260)
b1afaa6b869 is described below
commit b1afaa6b86971b40fcd6d1240602ffc0dfdc4540
Author: Ivan Zlenko <[email protected]>
AuthorDate: Thu Dec 18 14:37:15 2025 +0500
IGNITE-27149 Add additional logging for incoming snapshot (#7260)
---
.../raft/snapshot/PartitionSnapshotStorage.java | 2 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 208 ++++++++++++++++++---
.../snapshot/incoming/IncomingSnapshotReader.java | 2 +-
.../snapshot/incoming/IncomingSnapshotStats.java | 163 ++++++++++++++++
.../internal/storage/impl/TestMvTableStorage.java | 8 +
.../pagememory/AbstractPageMemoryTableStorage.java | 8 +
.../storage/rocksdb/RocksDbTableStorage.java | 8 +
.../rocksdb/TxStateRocksDbPartitionStorage.java | 6 +
8 files changed, 376 insertions(+), 29 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index c7b8588f170..6cf34dde4a1 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -147,7 +147,7 @@ public class PartitionSnapshotStorage {
this.logStorage = logStorage;
}
- public PartitionKey partitionKey() {
+ public ZonePartitionKey partitionKey() {
return partitionKey;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f1657a9d785..50bc86dbc4f 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageA
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -134,6 +135,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
@Nullable
private volatile CompletableFuture<Void> joinFuture;
+ private final IncomingSnapshotStats snapshotStats = new
IncomingSnapshotStats();
+
/**
* Constructor.
*
@@ -158,7 +161,11 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
@Override
public void start() {
- LOG.info("Copier is started for the partition [{}]",
createPartitionInfo());
+ snapshotStats.onSnapshotInstallationStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Rebalance is started [snapshotId={}, {}]",
snapshotUri.snapshotId, createPartitionInfo());
+ }
InternalClusterNode snapshotSender =
getSnapshotSender(snapshotUri.nodeName);
@@ -218,11 +225,35 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
}
private CompletableFuture<?>
waitForMetadataWithTimeout(PartitionSnapshotMeta snapshotMeta) {
+ snapshotStats.onWaitingCatalogPhaseStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Waiting for catalog version [snapshotId={}, {},
catalogVersion={}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotMeta.requiredCatalogVersion()
+ );
+ }
+
CompletableFuture<?> metadataReadyFuture =
partitionSnapshotStorage.catalogService()
.catalogReadyFuture(snapshotMeta.requiredCatalogVersion());
CompletableFuture<?> readinessTimeoutFuture =
completeOnMetadataReadinessTimeout();
- return anyOf(metadataReadyFuture, readinessTimeoutFuture);
+ return anyOf(metadataReadyFuture, readinessTimeoutFuture)
+ .whenComplete((ignored, throwable) -> {
+ snapshotStats.onWaitingCatalogPhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Finished waiting for the catalog readiness
[snapshotId={}, {}, waitingTime={}ms, result={}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+
snapshotStats.totalWaitingCatalogPhaseDuration(),
+ metadataIsSufficientlyComplete(snapshotMeta) ?
"success" : "timeout"
+ );
+ }
+ });
}
private CompletableFuture<?> completeOnMetadataReadinessTimeout() {
@@ -268,7 +299,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
busyLock.block();
- LOG.info("Copier is canceled for partition [{}]",
createPartitionInfo());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Rebalance is canceled [snapshotId={}, {}]",
snapshotUri.snapshotId, createPartitionInfo());
+ }
// Cancel all futures that might be upstream wrt joinFuture.
List<CompletableFuture<?>> futuresToCancel =
Stream.of(snapshotMetaFuture, rebalanceFuture)
@@ -317,6 +350,12 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
return nullCompletedFuture();
}
+ snapshotStats.onLoadSnapshotPhaseStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Start loading snapshot meta [snapshotId={}, {}]",
snapshotUri.snapshotId, createPartitionInfo());
+ }
+
try {
return partitionSnapshotStorage.messagingService().invoke(
snapshotSender,
@@ -325,7 +364,17 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
).thenApply(response -> {
PartitionSnapshotMeta snapshotMeta = ((SnapshotMetaResponse)
response).meta();
- LOG.info("Copier has loaded the snapshot meta for the
partition [{}, meta={}]", createPartitionInfo(), snapshotMeta);
+ snapshotStats.onLoadSnapshotPhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Snapshot meta has been loaded [snapshotId={}, {},
meta={}, loadingTime={}ms]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotMeta,
+ snapshotStats.totalLoadSnapshotPhaseDuration()
+ );
+ }
return snapshotMeta;
});
@@ -364,6 +413,16 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
return nullCompletedFuture();
}
+ snapshotStats.onLoadMvDataPhaseStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Start loading multi-versioned data [snapshotId={}, {}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo()
+ );
+ }
+
try {
return partitionSnapshotStorage.messagingService().invoke(
snapshotSender,
@@ -390,20 +449,34 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
}
}
+
snapshotStats.onMvBatchProcessing(snapshotMvDataResponse.rows().size());
+
if (snapshotMvDataResponse.finish()) {
- LOG.info(
- "Copier has finished loading multi-versioned data
[{}, rows={}]",
- createPartitionInfo(),
- snapshotMvDataResponse.rows().size()
- );
+ snapshotStats.onLoadMvDataPhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Multi-versioned data has been loaded
[snapshotId={}, {}, totalRows={}, totalBatches={},"
+ + " mvDataLoadingTime={}ms]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotMvDataResponse.rows().size(),
+ snapshotStats.totalMvDataRows(),
+ snapshotStats.totalMvDataBatches(),
+ snapshotStats.loadMvDataPhaseDuration()
+ );
+ }
return nullCompletedFuture();
} else {
- LOG.info(
- "Copier has loaded a portion of multi-versioned
data [{}, rows={}]",
- createPartitionInfo(),
- snapshotMvDataResponse.rows().size()
- );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "A portion of multi-versioned data has been
loaded [snapshotId={}, {}, rows={}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotMvDataResponse.rows().size()
+ );
+ }
// Let's upload the rest.
return loadSnapshotMvData(snapshotContext, snapshotSender);
@@ -422,6 +495,16 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
return nullCompletedFuture();
}
+ snapshotStats.onLoadTxMetasPhaseStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Start loading transaction meta data [snapshotId={}, {}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo()
+ );
+ }
+
try {
return partitionSnapshotStorage.messagingService().invoke(
snapshotSender,
@@ -450,20 +533,34 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
}
}
+
snapshotStats.onTxMetasBatchProcessing(snapshotTxDataResponse.txMeta().size());
+
if (snapshotTxDataResponse.finish()) {
- LOG.info(
- "Copier has finished loading transaction meta [{},
metas={}]",
- createPartitionInfo(),
- snapshotTxDataResponse.txMeta().size()
- );
+ snapshotStats.onLoadTxMetasPhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Transaction meta has been loaded
[snapshotId={}, {}, totalMetas={}, totalBatches={},"
+ + " metaLoadingTime={}ms]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotTxDataResponse.txMeta().size(),
+ snapshotStats.totalTxMetas(),
+ snapshotStats.totalTxMetasBatches(),
+ snapshotStats.loadTxMetasPhaseDuration()
+ );
+ }
return nullCompletedFuture();
} else {
- LOG.info(
- "Copier has loaded a portion of transaction meta
[{}, metas={}]",
- createPartitionInfo(),
- snapshotTxDataResponse.txMeta().size()
- );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "A portion of transaction meta has been loaded
[snapshotId={}, {}, metas={}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotTxDataResponse.txMeta().size()
+ );
+ }
// Let's upload the rest.
return loadSnapshotTxData(snapshotSender);
@@ -481,6 +578,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
* successful.
*/
private CompletableFuture<Void> completeRebalance(SnapshotContext
snapshotContext, @Nullable Throwable throwable) {
+ snapshotStats.onSnapshotInstallationEnd();
+
if (!busyLock.enterBusy()) {
if (isOk()) {
setError(RaftError.ECANCELED, "Copier is cancelled");
@@ -508,7 +607,12 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
}
if (LOG.isInfoEnabled()) {
- LOG.info("Copier completes the rebalancing of the partition:
[{}, meta={}]", createPartitionInfo(), snapshotContext.meta);
+ LOG.info(
+ "Rebalance is done [{}, meta={}, rebalanceTime={}ms]",
+ createPartitionInfo(),
+ snapshotContext.meta,
+ snapshotStats.totalSnapshotInstallationDuration()
+ );
}
MvPartitionMeta snapshotMeta = mvPartitionMeta(snapshotContext);
@@ -577,7 +681,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
private String createPartitionInfo() {
- return partitionSnapshotStorage.partitionKey().toString();
+ ZonePartitionKey partitionKey =
partitionSnapshotStorage.partitionKey();
+
+ return "zoneId=" + partitionKey.zoneId() + ", partitionId=" +
partitionKey.partitionId();
}
private void writeVersion(SnapshotContext snapshotContext, ResponseEntry
entry, int entryIndex) {
@@ -623,9 +729,20 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
return;
}
+ snapshotStats.onSetRowIdToBuildPhaseStart();
+
try {
Map<Integer, UUID> nextRowUuidToBuildByIndexId =
snapshotContext.meta.nextRowIdToBuildByIndexId();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Setting next row ID for index building
[snapshotId={}, {}, indexIdToRowId={}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ nextRowUuidToBuildByIndexId
+ );
+ }
+
if (nullOrEmpty(nextRowUuidToBuildByIndexId)) {
return;
}
@@ -653,6 +770,17 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
partitionAccess.setNextRowIdToBuildIndex(e.getValue());
}
}
+
+ snapshotStats.onSetRowIdToBuildPhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Finished setting next row ID for index building
[snapshotId={}, {}, totalTime={}ms]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+ snapshotStats.totalSetRowIdToBuildPhaseDuration()
+ );
+ }
+
} finally {
busyLock.leaveBusy();
}
@@ -688,11 +816,33 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
return nullCompletedFuture();
}
+ snapshotStats.onPreparingStoragePhaseStart();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Preparing storages for snapshot installation
[snapshotId={}, {}]",
+ snapshotUri.snapshotId,
+ createPartitionInfo()
+ );
+ }
+
try {
return allOf(
aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance,
snapshotContext),
partitionSnapshotStorage.txState().startRebalance()
- ).thenComposeAsync(unused ->
startRebalanceForReplicationLogStorages(snapshotContext), executor);
+ ).thenComposeAsync(unused ->
startRebalanceForReplicationLogStorages(snapshotContext), executor)
+ .whenComplete((ignore, throwable) -> {
+ snapshotStats.onPreparingStoragePhaseEnd();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Storages are prepared to load data
[snapshotId={}, {}, preparationTime={}ms]",
+ snapshotUri.snapshotId,
+ createPartitionInfo(),
+
snapshotStats.totalPreparingStoragePhaseDuration()
+ );
+ }
+ });
} finally {
busyLock.leaveBusy();
}
@@ -763,6 +913,10 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
private void
startRebalanceForReplicationLogStorage(ReplicationLogStorageKey key) throws
IgniteInternalException {
try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Start rebalance for the replication log storage
[snapshotId={}, {}]", snapshotUri.snapshotId, key);
+ }
+
LogStorageAccess logStorage =
partitionSnapshotStorage.logStorage();
logStorage.destroy(key.replicationGroupId(), key.isVolatile());
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotReader.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotReader.java
index 0a836e8dea9..ae85c46cdc4 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotReader.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotReader.java
@@ -41,7 +41,7 @@ class IncomingSnapshotReader extends SnapshotReader {
}
@Override
- public SnapshotMeta load() {
+ public @Nullable SnapshotMeta load() {
return snapshotMeta;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotStats.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotStats.java
new file mode 100644
index 00000000000..fe06d54762c
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotStats.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import org.apache.ignite.internal.metrics.StopWatchTimer;
+
+/**
+ * Statistical information related to {@link IncomingSnapshotCopier}.
+ *
+ * <p>Collects counters and phase durations (in milliseconds) for the incoming
snapshot installation process.</p>
+ */
+class IncomingSnapshotStats {
+ private long totalMvRows;
+
+ private int totalMvBatches;
+
+ private long totalTxMetas;
+
+ private int totalTxMetasBatches;
+
+ private final StopWatchTimer loadSnapshotMetaTimer = new StopWatchTimer();
+
+ private final StopWatchTimer waitingCatalogTimer = new StopWatchTimer();
+
+ private final StopWatchTimer preparingStoragesTimer = new StopWatchTimer();
+
+ private final StopWatchTimer loadMvDataTimer = new StopWatchTimer();
+
+ private final StopWatchTimer loadTxDataTimer = new StopWatchTimer();
+
+ private final StopWatchTimer setRowIdToBuildIndexTimer = new
StopWatchTimer();
+
+ private final StopWatchTimer totalSnapshotInstallationTimer = new
StopWatchTimer();
+
+ void onMvBatchProcessing(long rows) {
+ totalMvRows += rows;
+
+ totalMvBatches++;
+ }
+
+ void onLoadMvDataPhaseStart() {
+ loadMvDataTimer.start();
+ }
+
+ void onLoadMvDataPhaseEnd() {
+ loadMvDataTimer.end();
+ }
+
+ long totalMvDataRows() {
+ return totalMvRows;
+ }
+
+ long totalMvDataBatches() {
+ return totalMvBatches;
+ }
+
+ long loadMvDataPhaseDuration() {
+ return loadMvDataTimer.duration(MILLISECONDS);
+ }
+
+ void onTxMetasBatchProcessing(long metas) {
+ totalTxMetas += metas;
+
+ totalTxMetasBatches++;
+ }
+
+ void onLoadTxMetasPhaseStart() {
+ loadTxDataTimer.start();
+ }
+
+ void onLoadTxMetasPhaseEnd() {
+ loadTxDataTimer.end();
+ }
+
+ long totalTxMetas() {
+ return totalTxMetas;
+ }
+
+ long totalTxMetasBatches() {
+ return totalTxMetasBatches;
+ }
+
+ long loadTxMetasPhaseDuration() {
+ return loadTxDataTimer.duration(MILLISECONDS);
+ }
+
+ void onSnapshotInstallationStart() {
+ totalSnapshotInstallationTimer.start();
+ }
+
+ void onSnapshotInstallationEnd() {
+ totalSnapshotInstallationTimer.end();
+ }
+
+ long totalSnapshotInstallationDuration() {
+ return totalSnapshotInstallationTimer.duration(MILLISECONDS);
+ }
+
+ void onLoadSnapshotPhaseStart() {
+ loadSnapshotMetaTimer.start();
+ }
+
+ void onLoadSnapshotPhaseEnd() {
+ loadSnapshotMetaTimer.end();
+ }
+
+ long totalLoadSnapshotPhaseDuration() {
+ return loadSnapshotMetaTimer.duration(MILLISECONDS);
+ }
+
+ void onWaitingCatalogPhaseStart() {
+ waitingCatalogTimer.start();
+ }
+
+ void onWaitingCatalogPhaseEnd() {
+ waitingCatalogTimer.end();
+ }
+
+ long totalWaitingCatalogPhaseDuration() {
+ return waitingCatalogTimer.duration(MILLISECONDS);
+ }
+
+ void onPreparingStoragePhaseStart() {
+ preparingStoragesTimer.start();
+ }
+
+ void onPreparingStoragePhaseEnd() {
+ preparingStoragesTimer.end();
+ }
+
+ long totalPreparingStoragePhaseDuration() {
+ return preparingStoragesTimer.duration(MILLISECONDS);
+ }
+
+ void onSetRowIdToBuildPhaseStart() {
+ setRowIdToBuildIndexTimer.start();
+ }
+
+ void onSetRowIdToBuildPhaseEnd() {
+ setRowIdToBuildIndexTimer.end();
+ }
+
+ long totalSetRowIdToBuildPhaseDuration() {
+ return setRowIdToBuildIndexTimer.duration(MILLISECONDS);
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index b189150bb63..8af7750764d 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -35,6 +35,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
@@ -57,6 +59,8 @@ import
org.mockito.exceptions.misusing.UnfinishedStubbingException;
* Test table storage implementation.
*/
public class TestMvTableStorage implements MvTableStorage {
+ private static final IgniteLogger LOG =
Loggers.forClass(TestMvTableStorage.class);
+
private static volatile TestMvPartitionStorageFactory
partitionStorageFactory = TestMvPartitionStorageFactory.DEFAULT;
private final MvPartitionStorages<TestMvPartitionStorage>
mvPartitionStorages;
@@ -290,6 +294,10 @@ public class TestMvTableStorage implements MvTableStorage {
}
private CompletableFuture<Void> startRebalancePartitionBusy(int
partitionId) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting rebalance for partition [tableId={},
partitionId={}]", tableDescriptor.getId(), partitionId);
+ }
+
return mvPartitionStorages.startRebalance(partitionId,
mvPartitionStorage -> {
mvPartitionStorage.startRebalance();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 8f3a1011219..7d7d096b705 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.freelist.FreeList;
@@ -62,6 +64,8 @@ import org.jetbrains.annotations.Nullable;
* Abstract table storage implementation based on {@link PageMemory}.
*/
public abstract class AbstractPageMemoryTableStorage<T extends
AbstractPageMemoryMvPartitionStorage> implements MvTableStorage {
+ private static final IgniteLogger LOG =
Loggers.forClass(AbstractPageMemoryTableStorage.class);
+
final MvPartitionStorages<T> mvPartitionStorages;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -292,6 +296,10 @@ public abstract class AbstractPageMemoryTableStorage<T
extends AbstractPageMemor
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
return busy(() -> mvPartitionStorages.startRebalance(partitionId,
mvPartitionStorage -> {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting rebalance for partition [tableId={},
partitionId={}]", getTableId(), partitionId);
+ }
+
mvPartitionStorage.startRebalance();
return clearStorageAndUpdateDataStructures(
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 0cca89f01f9..6cb2c177d62 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -35,6 +35,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -61,6 +63,8 @@ import org.rocksdb.WriteBatch;
* Table storage implementation based on {@link RocksDB} instance.
*/
public class RocksDbTableStorage implements MvTableStorage {
+ private static final IgniteLogger LOG =
Loggers.forClass(RocksDbTableStorage.class);
+
private final SharedRocksDbInstance rocksDb;
/** Partition storages. */
@@ -307,6 +311,10 @@ public class RocksDbTableStorage implements MvTableStorage
{
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
return busy(() -> mvPartitionStorages.startRebalance(partitionId,
mvPartitionStorage -> {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting rebalance for partition [tableId={},
partitionId={}]", getTableId(), partitionId);
+ }
+
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.startRebalance(writeBatch);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
index 87485c3773b..e0849f0d98b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
@@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -64,6 +66,8 @@ import org.rocksdb.WriteBatch;
* Tx state storage implementation based on RocksDB.
*/
public class TxStateRocksDbPartitionStorage implements TxStatePartitionStorage
{
+ private static final IgniteLogger LOG =
Loggers.forClass(TxStateRocksDbPartitionStorage.class);
+
/** Prefix length for the payload. Consists of tableId/zoneId (4 bytes)
and partitionId (2 bytes), both in Big Endian. */
public static final int PREFIX_SIZE_BYTES = ZONE_PREFIX_SIZE_BYTES +
Short.BYTES;
@@ -420,6 +424,8 @@ public class TxStateRocksDbPartitionStorage implements
TxStatePartitionStorage {
@Override
public CompletableFuture<Void> startRebalance() {
+ LOG.info("Starting rebalance for transaction state storage [zoneId={},
partitionId={}]", zoneId, partitionId);
+
transitionFromRunningStateTo(StorageState.REBALANCE);
busyLock.block();