This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b061ffd003a [To dev/1.3] Add historical transfer summary logs (#17763)
b061ffd003a is described below

commit b061ffd003a6254a9f05a7b1cb4212033c7ea95e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 15:47:38 2026 +0800

    [To dev/1.3] Add historical transfer summary logs (#17763)
    
    Backport 18ea0e9aadc2c39a1fcbf8e34c559ae142b71221 (#17717) to dev/1.3.
---
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  13 ++
 .../agent/task/connection/PipeEventCollector.java  |   9 ++
 .../event/common/terminate/PipeTerminateEvent.java | 179 +++++++++++++++++++++
 .../PipeHistoricalDataRegionTsFileSource.java      |  16 ++
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  29 ++++
 5 files changed, 246 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6dc11ddd3f3..8cafa2094c4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -154,11 +154,24 @@ public class PipeHeartbeatParser {
       if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
 
         temporaryMeta.markDataNodeCompleted(nodeId);
+        LOGGER.info(
+            "Detected historical pipe completion report from DataNode {} for 
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+            nodeId,
+            staticMeta.getPipeName(),
+            pipeHeartbeat.getRemainingEventCount(staticMeta),
+            pipeHeartbeat.getRemainingTime(staticMeta),
+            temporaryMeta.getCompletedDataNodeIds());
 
         final Set<Integer> uncompletedDataNodeIds =
             
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
         
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
         if (uncompletedDataNodeIds.isEmpty()) {
+          LOGGER.info(
+              "All DataNodes reported historical pipe {} completed. 
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+              staticMeta.getPipeName(),
+              temporaryMeta.getGlobalRemainingEvents(),
+              temporaryMeta.getGlobalRemainingTime(),
+              staticMeta);
           pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
           LOGGER.info(
               "Detected completion of pipe {}, static meta: {}, remove it.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 387d4ff7ec6..95e8196ad38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -131,12 +132,20 @@ public class PipeEventCollector implements EventCollector 
{
 
     if (skipParsing || !forceTabletFormat && 
canSkipParsing4TsFileEvent(sourceEvent)) {
       collectEvent(sourceEvent);
+      if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+        PipeTerminateEvent.markHistoricalTsFileUnsplit(
+            sourceEvent.getPipeName(), sourceEvent.getCreationTime(), 
regionId);
+      }
       return;
     }
 
     try {
       sourceEvent.consumeTabletInsertionEventsWithRetry(
           this::collectParsedRawTableEvent, 
"PipeEventCollector::parseAndCollectEvent");
+      if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+        PipeTerminateEvent.markHistoricalTsFileSplit(
+            sourceEvent.getPipeName(), sourceEvent.getCreationTime(), 
regionId);
+      }
     } finally {
       sourceEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 4bf79a3df03..f57c016e246 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -32,9 +32,16 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
@@ -44,6 +51,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class PipeTerminateEvent extends EnrichedEvent {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
   private final int dataRegionId;
 
   private final boolean shouldMark;
@@ -57,6 +66,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
   // Do not use call run policy to avoid deadlock
   private static final ExecutorService terminateExecutor = 
createTerminateExecutor();
 
+  private static final ConcurrentMap<HistoricalTransferKey, 
HistoricalTransferSummaryCounter>
+      HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();
+
   private static ExecutorService createTerminateExecutor() {
     final WrappedThreadPoolExecutor executor =
         new WrappedThreadPoolExecutor(
@@ -128,6 +140,18 @@ public class PipeTerminateEvent extends EnrichedEvent {
   }
 
   public void markCompleted() {
+    final HistoricalTransferSummary summary =
+        snapshotAndClearHistoricalTransferSummary(pipeName, creationTime, 
dataRegionId);
+    if (Objects.nonNull(summary)) {
+      LOGGER.info(
+          "Pipe {}@{}: terminate event committed for historical transfer. 
creationTime: {}, shouldMark: {}. {}",
+          pipeName,
+          dataRegionId,
+          creationTime,
+          shouldMark,
+          summary.toReportMessage());
+    }
+
     // To avoid deadlock
     if (shouldMark) {
       terminateExecutor.submit(
@@ -142,4 +166,159 @@ public class PipeTerminateEvent extends EnrichedEvent {
         + " - "
         + super.toString();
   }
+
+  public static void initializeHistoricalTransferSummary(
+      final String pipeName,
+      final long creationTime,
+      final int dataRegionId,
+      final long extractedHistoricalTsFileCount,
+      final long extractedHistoricalDeletionCount) {
+    HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
+        .computeIfAbsent(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+            ignored -> new HistoricalTransferSummaryCounter())
+        .initialize(extractedHistoricalTsFileCount, 
extractedHistoricalDeletionCount);
+  }
+
+  public static void markHistoricalTsFileSkipped(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .skippedHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static void markHistoricalTsFileSplit(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .splitHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static void markHistoricalTsFileUnsplit(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .unsplitHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    final HistoricalTransferSummaryCounter counter =
+        HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+    return Objects.nonNull(counter) ? counter.snapshot() : null;
+  }
+
+  public static void clearHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+        new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+  }
+
+  private static HistoricalTransferSummary 
snapshotAndClearHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    final HistoricalTransferSummaryCounter counter =
+        HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+    return Objects.nonNull(counter) ? counter.snapshot() : null;
+  }
+
+  private static HistoricalTransferSummaryCounter 
getOrCreateHistoricalTransferSummaryCounter(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
+        new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+        ignored -> new HistoricalTransferSummaryCounter());
+  }
+
+  public static final class HistoricalTransferSummary {
+
+    private final long extractedHistoricalTsFileCount;
+    private final long skippedHistoricalTsFileCount;
+    private final long splitHistoricalTsFileCount;
+    private final long unsplitHistoricalTsFileCount;
+    private final long extractedHistoricalDeletionCount;
+
+    private HistoricalTransferSummary(
+        final long extractedHistoricalTsFileCount,
+        final long skippedHistoricalTsFileCount,
+        final long splitHistoricalTsFileCount,
+        final long unsplitHistoricalTsFileCount,
+        final long extractedHistoricalDeletionCount) {
+      this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
+      this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
+      this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
+      this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
+      this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
+    }
+
+    public String toReportMessage() {
+      return String.format(
+          "historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s, 
splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",
+          extractedHistoricalTsFileCount,
+          skippedHistoricalTsFileCount,
+          splitHistoricalTsFileCount,
+          unsplitHistoricalTsFileCount,
+          extractedHistoricalDeletionCount);
+    }
+  }
+
+  private static final class HistoricalTransferSummaryCounter {
+
+    private final AtomicLong extractedHistoricalTsFileCount = new 
AtomicLong(0);
+    private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong extractedHistoricalDeletionCount = new 
AtomicLong(0);
+
+    private void initialize(
+        final long extractedHistoricalTsFileCount, final long 
extractedHistoricalDeletionCount) {
+      this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
+      this.skippedHistoricalTsFileCount.set(0);
+      this.splitHistoricalTsFileCount.set(0);
+      this.unsplitHistoricalTsFileCount.set(0);
+      
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
+    }
+
+    private HistoricalTransferSummary snapshot() {
+      return new HistoricalTransferSummary(
+          extractedHistoricalTsFileCount.get(),
+          skippedHistoricalTsFileCount.get(),
+          splitHistoricalTsFileCount.get(),
+          unsplitHistoricalTsFileCount.get(),
+          extractedHistoricalDeletionCount.get());
+    }
+  }
+
+  private static final class HistoricalTransferKey {
+
+    private final String pipeName;
+    private final long creationTime;
+    private final int dataRegionId;
+
+    private HistoricalTransferKey(
+        final String pipeName, final long creationTime, final int 
dataRegionId) {
+      this.pipeName = pipeName;
+      this.creationTime = creationTime;
+      this.dataRegionId = dataRegionId;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof HistoricalTransferKey)) {
+        return false;
+      }
+      final HistoricalTransferKey that = (HistoricalTransferKey) obj;
+      return creationTime == that.creationTime
+          && dataRegionId == that.dataRegionId
+          && Objects.equals(pipeName, that.pipeName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(pipeName, creationTime, dataRegionId);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index fd960584c9f..54597cb1bcd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -441,6 +441,8 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
                     ? Long.compare(o1.getFileStartTime(), 
o2.getFileStartTime())
                     : 
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
         pendingQueue = new ArrayDeque<>(originalResourceList);
+        PipeTerminateEvent.initializeHistoricalTransferSummary(
+            pipeName, creationTime, dataRegionId, 
filteredTsFileResources.size(), 0);
 
         LOGGER.info(
             "Pipe {}@{}: finish to extract historical TsFile, extracted 
sequence file count {}/{}, "
@@ -537,6 +539,17 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
     final TsFileResource resource = pendingQueue.poll();
 
     if (resource == null) {
+      final PipeTerminateEvent.HistoricalTransferSummary 
historicalTransferSummary =
+          PipeTerminateEvent.snapshotHistoricalTransferSummary(
+              pipeName, creationTime, dataRegionId);
+      if (Objects.nonNull(historicalTransferSummary)) {
+        LOGGER.info(
+            "Pipe {}@{}: historical source has supplied all events, emitting 
terminate event. {}",
+            pipeName,
+            dataRegionId,
+            historicalTransferSummary.toReportMessage());
+      }
+
       final PipeTerminateEvent terminateEvent =
           new PipeTerminateEvent(
               pipeName,
@@ -632,6 +645,9 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
 
   @Override
   public synchronized void close() {
+    if (!isTerminateSignalSent) {
+      PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, 
creationTime, dataRegionId);
+    }
     if (Objects.nonNull(pendingQueue)) {
       pendingQueue.forEach(
           resource -> {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index d48a9fdbcb2..604ad044dd6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -46,6 +46,7 @@ public class PipeReceiverStatusHandler {
   private static final String NO_PERMISSION = "No permission";
   private static final String UNCLASSIFIED_EXCEPTION = "Unclassified 
exception";
   private static final String NO_PERMISSION_STR = "No permissions for this 
operation";
+  private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048;
 
   private final boolean isRetryAllowedWhenConflictOccurs;
   private final long retryMaxMillisWhenConflictOccurs;
@@ -134,6 +135,7 @@ public class PipeReceiverStatusHandler {
               "User conflict exception: will be ignored because retry is not 
allowed. event: {}. status: {}",
               shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not 
recorded",
               status);
+          logDiscardedUserConflictData("retry is not allowed", recordMessage, 
status);
           return;
         }
 
@@ -147,6 +149,7 @@ public class PipeReceiverStatusHandler {
                 "User conflict exception: retry timeout. will be ignored. 
event: {}. status: {}",
                 shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : 
"not recorded",
                 status);
+            logDiscardedUserConflictData("retry timeout", recordMessage, 
status);
             resetExceptionStatus();
             return;
           }
@@ -252,6 +255,32 @@ public class PipeReceiverStatusHandler {
     return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
   }
 
+  private void logDiscardedUserConflictData(
+      final String reason, final String recordMessage, final TSStatus status) {
+    if (!LOGGER.isWarnEnabled()) {
+      return;
+    }
+
+    LOGGER.warn(
+        "User conflict exception: discarded data info because {}. data: {}. 
receiver message: {}. status: {}",
+        reason,
+        summarizeRecordMessage(recordMessage),
+        status.getMessage(),
+        status);
+  }
+
+  private String summarizeRecordMessage(final String recordMessage) {
+    if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) {
+      return "<empty>";
+    }
+
+    final String normalizedRecordMessage =
+        recordMessage.replace('\r', ' ').replace('\n', ' ').trim();
+    return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG
+        ? normalizedRecordMessage
+        : normalizedRecordMessage.substring(0, 
MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)";
+  }
+
   private void recordExceptionStatusIfNecessary(final String message) {
     if (!Objects.equals(exceptionRecordedMessage.get(), message)) {
       exceptionFirstEncounteredTime.set(System.currentTimeMillis());

Reply via email to