This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b061ffd003a [To dev/1.3] Add historical transfer summary logs (#17763)
b061ffd003a is described below
commit b061ffd003a6254a9f05a7b1cb4212033c7ea95e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 15:47:38 2026 +0800
[To dev/1.3] Add historical transfer summary logs (#17763)
Backport 18ea0e9aadc2c39a1fcbf8e34c559ae142b71221 (#17717) to dev/1.3.
---
.../runtime/heartbeat/PipeHeartbeatParser.java | 13 ++
.../agent/task/connection/PipeEventCollector.java | 9 ++
.../event/common/terminate/PipeTerminateEvent.java | 179 +++++++++++++++++++++
.../PipeHistoricalDataRegionTsFileSource.java | 16 ++
.../pipe/receiver/PipeReceiverStatusHandler.java | 29 ++++
5 files changed, 246 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6dc11ddd3f3..8cafa2094c4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -154,11 +154,24 @@ public class PipeHeartbeatParser {
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
temporaryMeta.markDataNodeCompleted(nodeId);
+ LOGGER.info(
+ "Detected historical pipe completion report from DataNode {} for
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+ nodeId,
+ staticMeta.getPipeName(),
+ pipeHeartbeat.getRemainingEventCount(staticMeta),
+ pipeHeartbeat.getRemainingTime(staticMeta),
+ temporaryMeta.getCompletedDataNodeIds());
final Set<Integer> uncompletedDataNodeIds =
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
+ LOGGER.info(
+ "All DataNodes reported historical pipe {} completed.
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+ staticMeta.getPipeName(),
+ temporaryMeta.getGlobalRemainingEvents(),
+ temporaryMeta.getGlobalRemainingTime(),
+ staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
LOGGER.info(
"Detected completion of pipe {}, static meta: {}, remove it.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 387d4ff7ec6..95e8196ad38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -131,12 +132,20 @@ public class PipeEventCollector implements EventCollector
{
if (skipParsing || !forceTabletFormat &&
canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
+ if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+ PipeTerminateEvent.markHistoricalTsFileUnsplit(
+ sourceEvent.getPipeName(), sourceEvent.getCreationTime(),
regionId);
+ }
return;
}
try {
sourceEvent.consumeTabletInsertionEventsWithRetry(
this::collectParsedRawTableEvent,
"PipeEventCollector::parseAndCollectEvent");
+ if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+ PipeTerminateEvent.markHistoricalTsFileSplit(
+ sourceEvent.getPipeName(), sourceEvent.getCreationTime(),
regionId);
+ }
} finally {
sourceEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 4bf79a3df03..f57c016e246 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -32,9 +32,16 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
@@ -44,6 +51,8 @@ import java.util.concurrent.TimeUnit;
*/
public class PipeTerminateEvent extends EnrichedEvent {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
private final int dataRegionId;
private final boolean shouldMark;
@@ -57,6 +66,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
// Do not use call run policy to avoid deadlock
private static final ExecutorService terminateExecutor =
createTerminateExecutor();
+ private static final ConcurrentMap<HistoricalTransferKey,
HistoricalTransferSummaryCounter>
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();
+
private static ExecutorService createTerminateExecutor() {
final WrappedThreadPoolExecutor executor =
new WrappedThreadPoolExecutor(
@@ -128,6 +140,18 @@ public class PipeTerminateEvent extends EnrichedEvent {
}
public void markCompleted() {
+ final HistoricalTransferSummary summary =
+ snapshotAndClearHistoricalTransferSummary(pipeName, creationTime,
dataRegionId);
+ if (Objects.nonNull(summary)) {
+ LOGGER.info(
+ "Pipe {}@{}: terminate event committed for historical transfer.
creationTime: {}, shouldMark: {}. {}",
+ pipeName,
+ dataRegionId,
+ creationTime,
+ shouldMark,
+ summary.toReportMessage());
+ }
+
// To avoid deadlock
if (shouldMark) {
terminateExecutor.submit(
@@ -142,4 +166,159 @@ public class PipeTerminateEvent extends EnrichedEvent {
+ " - "
+ super.toString();
}
+
+ public static void initializeHistoricalTransferSummary(
+ final String pipeName,
+ final long creationTime,
+ final int dataRegionId,
+ final long extractedHistoricalTsFileCount,
+ final long extractedHistoricalDeletionCount) {
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
+ .computeIfAbsent(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+ ignored -> new HistoricalTransferSummaryCounter())
+ .initialize(extractedHistoricalTsFileCount,
extractedHistoricalDeletionCount);
+ }
+
+ public static void markHistoricalTsFileSkipped(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .skippedHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static void markHistoricalTsFileSplit(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .splitHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static void markHistoricalTsFileUnsplit(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .unsplitHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ final HistoricalTransferSummaryCounter counter =
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ return Objects.nonNull(counter) ? counter.snapshot() : null;
+ }
+
+ public static void clearHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ }
+
+ private static HistoricalTransferSummary
snapshotAndClearHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ final HistoricalTransferSummaryCounter counter =
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ return Objects.nonNull(counter) ? counter.snapshot() : null;
+ }
+
+ private static HistoricalTransferSummaryCounter
getOrCreateHistoricalTransferSummaryCounter(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+ ignored -> new HistoricalTransferSummaryCounter());
+ }
+
+ public static final class HistoricalTransferSummary {
+
+ private final long extractedHistoricalTsFileCount;
+ private final long skippedHistoricalTsFileCount;
+ private final long splitHistoricalTsFileCount;
+ private final long unsplitHistoricalTsFileCount;
+ private final long extractedHistoricalDeletionCount;
+
+ private HistoricalTransferSummary(
+ final long extractedHistoricalTsFileCount,
+ final long skippedHistoricalTsFileCount,
+ final long splitHistoricalTsFileCount,
+ final long unsplitHistoricalTsFileCount,
+ final long extractedHistoricalDeletionCount) {
+ this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
+ this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
+ this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
+ this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
+ this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
+ }
+
+ public String toReportMessage() {
+ return String.format(
+ "historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s,
splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",
+ extractedHistoricalTsFileCount,
+ skippedHistoricalTsFileCount,
+ splitHistoricalTsFileCount,
+ unsplitHistoricalTsFileCount,
+ extractedHistoricalDeletionCount);
+ }
+ }
+
+ private static final class HistoricalTransferSummaryCounter {
+
+ private final AtomicLong extractedHistoricalTsFileCount = new
AtomicLong(0);
+ private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong extractedHistoricalDeletionCount = new
AtomicLong(0);
+
+ private void initialize(
+ final long extractedHistoricalTsFileCount, final long
extractedHistoricalDeletionCount) {
+ this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
+ this.skippedHistoricalTsFileCount.set(0);
+ this.splitHistoricalTsFileCount.set(0);
+ this.unsplitHistoricalTsFileCount.set(0);
+
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
+ }
+
+ private HistoricalTransferSummary snapshot() {
+ return new HistoricalTransferSummary(
+ extractedHistoricalTsFileCount.get(),
+ skippedHistoricalTsFileCount.get(),
+ splitHistoricalTsFileCount.get(),
+ unsplitHistoricalTsFileCount.get(),
+ extractedHistoricalDeletionCount.get());
+ }
+ }
+
+ private static final class HistoricalTransferKey {
+
+ private final String pipeName;
+ private final long creationTime;
+ private final int dataRegionId;
+
+ private HistoricalTransferKey(
+ final String pipeName, final long creationTime, final int
dataRegionId) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.dataRegionId = dataRegionId;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HistoricalTransferKey)) {
+ return false;
+ }
+ final HistoricalTransferKey that = (HistoricalTransferKey) obj;
+ return creationTime == that.creationTime
+ && dataRegionId == that.dataRegionId
+ && Objects.equals(pipeName, that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName, creationTime, dataRegionId);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index fd960584c9f..54597cb1bcd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -441,6 +441,8 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
? Long.compare(o1.getFileStartTime(),
o2.getFileStartTime())
:
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
pendingQueue = new ArrayDeque<>(originalResourceList);
+ PipeTerminateEvent.initializeHistoricalTransferSummary(
+ pipeName, creationTime, dataRegionId,
filteredTsFileResources.size(), 0);
LOGGER.info(
"Pipe {}@{}: finish to extract historical TsFile, extracted
sequence file count {}/{}, "
@@ -537,6 +539,17 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
final TsFileResource resource = pendingQueue.poll();
if (resource == null) {
+ final PipeTerminateEvent.HistoricalTransferSummary
historicalTransferSummary =
+ PipeTerminateEvent.snapshotHistoricalTransferSummary(
+ pipeName, creationTime, dataRegionId);
+ if (Objects.nonNull(historicalTransferSummary)) {
+ LOGGER.info(
+ "Pipe {}@{}: historical source has supplied all events, emitting
terminate event. {}",
+ pipeName,
+ dataRegionId,
+ historicalTransferSummary.toReportMessage());
+ }
+
final PipeTerminateEvent terminateEvent =
new PipeTerminateEvent(
pipeName,
@@ -632,6 +645,9 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
@Override
public synchronized void close() {
+ if (!isTerminateSignalSent) {
+ PipeTerminateEvent.clearHistoricalTransferSummary(pipeName,
creationTime, dataRegionId);
+ }
if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index d48a9fdbcb2..604ad044dd6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -46,6 +46,7 @@ public class PipeReceiverStatusHandler {
private static final String NO_PERMISSION = "No permission";
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
private static final String NO_PERMISSION_STR = "No permissions for this
operation";
+ private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048;
private final boolean isRetryAllowedWhenConflictOccurs;
private final long retryMaxMillisWhenConflictOccurs;
@@ -134,6 +135,7 @@ public class PipeReceiverStatusHandler {
"User conflict exception: will be ignored because retry is not
allowed. event: {}. status: {}",
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not
recorded",
status);
+ logDiscardedUserConflictData("retry is not allowed", recordMessage,
status);
return;
}
@@ -147,6 +149,7 @@ public class PipeReceiverStatusHandler {
"User conflict exception: retry timeout. will be ignored.
event: {}. status: {}",
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage :
"not recorded",
status);
+ logDiscardedUserConflictData("retry timeout", recordMessage,
status);
resetExceptionStatus();
return;
}
@@ -252,6 +255,32 @@ public class PipeReceiverStatusHandler {
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
}
+ private void logDiscardedUserConflictData(
+ final String reason, final String recordMessage, final TSStatus status) {
+ if (!LOGGER.isWarnEnabled()) {
+ return;
+ }
+
+ LOGGER.warn(
+ "User conflict exception: discarded data info because {}. data: {}.
receiver message: {}. status: {}",
+ reason,
+ summarizeRecordMessage(recordMessage),
+ status.getMessage(),
+ status);
+ }
+
+ private String summarizeRecordMessage(final String recordMessage) {
+ if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) {
+ return "<empty>";
+ }
+
+ final String normalizedRecordMessage =
+ recordMessage.replace('\r', ' ').replace('\n', ' ').trim();
+ return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG
+ ? normalizedRecordMessage
+ : normalizedRecordMessage.substring(0,
MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)";
+ }
+
private void recordExceptionStatusIfNecessary(final String message) {
if (!Objects.equals(exceptionRecordedMessage.get(), message)) {
exceptionFirstEncounteredTime.set(System.currentTimeMillis());