This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1f269c38cae IGNITE-27008 Improve index build log (#7064)
1f269c38cae is described below
commit 1f269c38cae82c546f256fefe725e7587c20ab33
Author: Vladimir Dmitrienko <[email protected]>
AuthorDate: Thu Nov 27 10:51:35 2025 +0400
IGNITE-27008 Improve index build log (#7064)
---
.../ignite/internal/index/IndexBuildTask.java | 61 ++++++---
.../IndexBuildTaskStatisticsLoggingListener.java | 148 +++++++++++++++++++++
.../apache/ignite/internal/index/IndexBuilder.java | 12 +-
...ndexBuildTaskStatisticsLoggingListenerTest.java | 84 ++++++++++++
4 files changed, 283 insertions(+), 22 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 6a655e5d25d..e2494a5e368 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -26,6 +26,7 @@ import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import java.util.ArrayList;
@@ -99,7 +100,9 @@ class IndexBuildTask {
private final InternalClusterNode node;
- private final List<IndexBuildCompletionListener> listeners;
+ private final List<IndexBuildCompletionListener> buildCompletionListeners;
+
+ private final IndexBuildTaskStatisticsLoggingListener
statisticsLoggingListener;
private final long enlistmentConsistencyToken;
@@ -125,7 +128,7 @@ class IndexBuildTask {
IgniteSpinBusyLock busyLock,
int batchSize,
InternalClusterNode node,
- List<IndexBuildCompletionListener> listeners,
+ List<IndexBuildCompletionListener> buildCompletionListeners,
long enlistmentConsistencyToken,
boolean afterDisasterRecovery,
HybridTimestamp initialOperationTimestamp
@@ -142,7 +145,8 @@ class IndexBuildTask {
this.batchSize = batchSize;
this.node = node;
// We do not intentionally make a copy of the list, we want to see
changes in the passed list.
- this.listeners = listeners;
+ this.buildCompletionListeners = buildCompletionListeners;
+ this.statisticsLoggingListener = new
IndexBuildTaskStatisticsLoggingListener(taskId, afterDisasterRecovery);
this.enlistmentConsistencyToken = enlistmentConsistencyToken;
this.afterDisasterRecovery = afterDisasterRecovery;
this.initialOperationTimestamp = initialOperationTimestamp;
@@ -156,24 +160,36 @@ class IndexBuildTask {
return;
}
- LOG.info("Start building the index: [{}]", createCommonIndexInfo());
+ String indexInfo = createCommonIndexInfo();
+ if (afterDisasterRecovery) {
+ LOG.warn("Start building the index due to disaster recovery of an
AVAILABLE index. This shouldn't normally occur [{}]",
+ indexInfo
+ );
+ } else {
+ LOG.info("Start building the index [{}]", indexInfo);
+ }
try {
+ statisticsLoggingListener.onIndexBuildStarted();
+
supplyAsync(partitionStorage::highestRowId, executor)
.thenApplyAsync(this::handleNextBatch, executor)
.thenCompose(Function.identity())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
if (ignorable(throwable)) {
- LOG.debug("Index build error: [{}]",
throwable, createCommonIndexInfo());
+ LOG.info("Ignorable index build error [{},
error={}]", indexInfo, unwrapCause(throwable));
} else {
- String errorMessage = String.format("Index
build error: [%s]", createCommonIndexInfo());
- failureProcessor.process(new
FailureContext(throwable, errorMessage));
+ String message = String.format("Index build
error [%s, error=%s]", indexInfo, unwrapCause(throwable));
+
+ failureProcessor.process(new
FailureContext(throwable, message));
}
taskFuture.completeExceptionally(throwable);
+
statisticsLoggingListener.onIndexBuildFailure(throwable);
} else {
taskFuture.complete(null);
+ statisticsLoggingListener.onIndexBuildSuccess();
}
});
} catch (Throwable t) {
@@ -221,9 +237,7 @@ class IndexBuildTask {
try {
return createBatchToIndex(highestRowId)
- .thenCompose(batch -> {
- return replicaService.invoke(node,
createBuildIndexReplicaRequest(batch, initialOperationTimestamp));
- })
+ .thenCompose(this::processBatch)
.handleAsync((unused, throwable) -> {
if (throwable != null) {
Throwable cause = unwrapRootCause(throwable);
@@ -234,9 +248,9 @@ class IndexBuildTask {
}
} else if (indexStorage.getNextRowIdToBuild() == null)
{
// Index has been built.
- LOG.info("Index build completed: [{}]",
createCommonIndexInfo());
+ LOG.info("Index build completed [{}]",
createCommonIndexInfo());
- notifyListeners(taskId);
+ notifyBuildCompletionListeners(taskId);
return
CompletableFutures.<Void>nullCompletedFuture();
}
@@ -301,7 +315,22 @@ class IndexBuildTask {
ZonePartitionId commitGroupId = new
ZonePartitionId(commitPartitionId.commitZoneId,
commitPartitionId.commitPartitionId);
- return
finalTransactionStateResolver.resolveFinalTxState(transactionId, commitGroupId);
+ return
finalTransactionStateResolver.resolveFinalTxState(transactionId, commitGroupId)
+ .thenApply(statisticsLoggingListener::onWriteIntentResolved);
+ }
+
+ private CompletableFuture<Void> processBatch(BatchToIndex batch) {
+ BuildIndexReplicaRequest request =
createBuildIndexReplicaRequest(batch, initialOperationTimestamp);
+
+ return replicaService.invoke(node, request)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ statisticsLoggingListener.onRaftCallSuccess();
+ } else {
+ statisticsLoggingListener.onRaftCallFailure();
+ }
+ })
+ .thenAccept(unused ->
statisticsLoggingListener.onBatchProcessed(batch.rowIds.size()));
}
private BuildIndexReplicaRequest
createBuildIndexReplicaRequest(BatchToIndex batch, HybridTimestamp
initialOperationTimestamp) {
@@ -333,13 +362,13 @@ class IndexBuildTask {
private String createCommonIndexInfo() {
return IgniteStringFormatter.format(
- "zoneId = {}, tableId={}, partitionId={}, indexId={}",
+ "zoneId={}, tableId={}, partitionId={}, indexId={}",
taskId.getZoneId(), taskId.getTableId(),
taskId.getPartitionId(), taskId.getIndexId()
);
}
- private void notifyListeners(IndexBuildTaskId taskId) {
- for (IndexBuildCompletionListener listener : listeners) {
+ private void notifyBuildCompletionListeners(IndexBuildTaskId taskId) {
+ for (IndexBuildCompletionListener listener : buildCompletionListeners)
{
if (afterDisasterRecovery) {
listener.onBuildCompletionAfterDisasterRecovery(taskId.getIndexId(),
taskId.getTableId(), taskId.getPartitionId());
} else {
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListener.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListener.java
new file mode 100644
index 00000000000..3dba8180b9a
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListener.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.TxState;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Listener that collects {@link IndexBuildTask} statistics during execution
and logs the aggregated results when the index build
+ * completes.
+ */
+class IndexBuildTaskStatisticsLoggingListener {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexBuildTaskStatisticsLoggingListener.class);
+
+ private final IndexBuildTaskId taskId;
+
+ private final boolean afterDisasterRecovery;
+
+ private final AtomicLong startTime = new AtomicLong();
+
+ private final AtomicInteger successfulRaftCallCount = new AtomicInteger(0);
+
+ private final AtomicInteger failedRaftCallCount = new AtomicInteger(0);
+
+ private final AtomicLong rowIndexedCount = new AtomicLong(0);
+
+ private final ConcurrentMap<TxState, AtomicInteger>
resolvedWriteIntentCount = new ConcurrentHashMap<>();
+
+ IndexBuildTaskStatisticsLoggingListener(IndexBuildTaskId taskId, boolean
afterDisasterRecovery) {
+ this.taskId = taskId;
+ this.afterDisasterRecovery = afterDisasterRecovery;
+ }
+
+ void onIndexBuildStarted() {
+ startTime.set(System.nanoTime());
+ }
+
+ TxState onWriteIntentResolved(TxState txState) {
+ resolvedWriteIntentCount.computeIfAbsent(txState, unused -> new
AtomicInteger(0)).incrementAndGet();
+
+ return txState;
+ }
+
+ void onRaftCallSuccess() {
+ successfulRaftCallCount.incrementAndGet();
+ }
+
+ void onRaftCallFailure() {
+ failedRaftCallCount.incrementAndGet();
+ }
+
+ void onBatchProcessed(int rowCount) {
+ rowIndexedCount.addAndGet(rowCount);
+ }
+
+ void onIndexBuildSuccess() {
+ logStatistics(null);
+ }
+
+ void onIndexBuildFailure(Throwable throwable) {
+ logStatistics(throwable);
+ }
+
+ private void logStatistics(@Nullable Throwable throwable) {
+ long time = getBuildTime();
+ String status = throwable == null
+ ? "SUCCESS"
+ : String.format("FAILURE [error=%s]", unwrapCause(throwable));
+ String buildReason = afterDisasterRecovery ? "DISASTER_RECOVERY" :
"BUILD";
+
+ LOG.info(
+ "Index build statistics ["
+ + "status={}, "
+ + "taskId={}, "
+ + "buildReason={}, "
+ + "time={}ms, "
+ + "rowsIndexed={}, "
+ + "successfulRaftCalls={}, "
+ + "failedRaftCalls={}, "
+ + "resolvedWriteIntents={}]",
+ status,
+ taskId,
+ buildReason,
+ time,
+ rowIndexedCount,
+ successfulRaftCallCount,
+ failedRaftCallCount,
+ resolvedWriteIntentCount
+ );
+ }
+
+ private long getBuildTime() {
+ assert startTime.get() != 0 : "startTime was not initialized before
measuring index build time";
+
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTime.get());
+ }
+
+ @TestOnly
+ AtomicLong startTime() {
+ return startTime;
+ }
+
+ @TestOnly
+ Map<TxState, AtomicInteger> resolvedWriteIntentCount() {
+ return resolvedWriteIntentCount;
+ }
+
+ @TestOnly
+ AtomicLong rowIndexedCount() {
+ return rowIndexedCount;
+ }
+
+ @TestOnly
+ AtomicInteger successfulRaftCallCount() {
+ return successfulRaftCallCount;
+ }
+
+ @TestOnly
+ AtomicInteger failedRaftCallCount() {
+ return failedRaftCallCount;
+ }
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index c3b04039f78..0a43be55365 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -81,7 +81,7 @@ class IndexBuilder implements ManuallyCloseable {
private final AtomicBoolean closeGuard = new AtomicBoolean();
- private final List<IndexBuildCompletionListener> listeners = new
CopyOnWriteArrayList<>();
+ private final List<IndexBuildCompletionListener> buildCompletionListeners
= new CopyOnWriteArrayList<>();
/** Constructor. */
IndexBuilder(
@@ -133,7 +133,7 @@ class IndexBuilder implements ManuallyCloseable {
) {
inBusyLockSafe(busyLock, () -> {
if (indexStorage.getNextRowIdToBuild() == null) {
- for (IndexBuildCompletionListener listener : listeners) {
+ for (IndexBuildCompletionListener listener :
buildCompletionListeners) {
listener.onBuildCompletion(indexId, tableId, partitionId);
}
@@ -154,7 +154,7 @@ class IndexBuilder implements ManuallyCloseable {
busyLock,
BATCH_SIZE,
node,
- listeners,
+ buildCompletionListeners,
enlistmentConsistencyToken,
false,
initialOperationTimestamp
@@ -218,7 +218,7 @@ class IndexBuilder implements ManuallyCloseable {
busyLock,
BATCH_SIZE,
node,
- listeners,
+ buildCompletionListeners,
enlistmentConsistencyToken,
true,
initialOperationTimestamp
@@ -280,12 +280,12 @@ class IndexBuilder implements ManuallyCloseable {
/** Adds a listener. */
public void listen(IndexBuildCompletionListener listener) {
- listeners.add(listener);
+ buildCompletionListeners.add(listener);
}
/** Removes a listener. */
public void stopListen(IndexBuildCompletionListener listener) {
- listeners.remove(listener);
+ buildCompletionListeners.remove(listener);
}
private void putAndStartTaskIfAbsent(IndexBuildTaskId taskId,
IndexBuildTask task) {
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListenerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListenerTest.java
new file mode 100644
index 00000000000..cb26404e0df
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildTaskStatisticsLoggingListenerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.tx.TxState;
+import org.junit.jupiter.api.Test;
+
+class IndexBuildTaskStatisticsLoggingListenerTest {
+
+ private final IndexBuildTaskId taskId = new IndexBuildTaskId(1, 1, 1, 1);
+
+ private final IndexBuildTaskStatisticsLoggingListener listener = new
IndexBuildTaskStatisticsLoggingListener(
+ taskId, false
+ );
+
+ @Test
+ void testOnIndexBuildSuccessPasses() {
+ listener.onIndexBuildStarted();
+
+ assertDoesNotThrow(listener::onIndexBuildSuccess);
+ }
+
+ @Test
+ void testOnIndexBuildFailurePasses() {
+ listener.onIndexBuildStarted();
+
+ assertDoesNotThrow(() -> listener.onIndexBuildFailure(new
RuntimeException("Index build exception")));
+ }
+
+ @Test
+ void testOnIndexBuildStartedSetsStartTime() {
+ listener.onIndexBuildStarted();
+
+ assertTrue(listener.startTime().get() > 0);
+ }
+
+ @Test
+ void testOnBatchProcessedAccumulatesRowCount() {
+ listener.onBatchProcessed(10);
+ listener.onBatchProcessed(5);
+
+ assertEquals(15, listener.rowIndexedCount().get());
+ }
+
+ @Test
+ void testOnRaftCallSuccessAndFailureAccumulateCounts() {
+ listener.onRaftCallSuccess();
+ listener.onRaftCallSuccess();
+ listener.onRaftCallFailure();
+
+ assertEquals(2, listener.successfulRaftCallCount().get());
+ assertEquals(1, listener.failedRaftCallCount().get());
+ }
+
+ @Test
+ void testOnWriteIntentResolvedAccumulatesCounts() {
+ listener.onWriteIntentResolved(TxState.COMMITTED);
+ listener.onWriteIntentResolved(TxState.COMMITTED);
+ listener.onWriteIntentResolved(TxState.ABANDONED);
+
+ assertEquals(2, listener.resolvedWriteIntentCount().size());
+ assertEquals(2,
listener.resolvedWriteIntentCount().get(TxState.COMMITTED).get());
+ assertEquals(1,
listener.resolvedWriteIntentCount().get(TxState.ABANDONED).get());
+ }
+}