This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch log-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7b60882550c5d2175aa7f3f8e15e55e8296f10ff
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 19 11:39:13 2026 +0800

    add
---
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  13 ++
 .../agent/task/connection/PipeEventCollector.java  |   9 ++
 .../subtask/processor/PipeProcessorSubtask.java    |   7 +
 .../event/common/terminate/PipeTerminateEvent.java | 179 +++++++++++++++++++++
 ...istoricalDataRegionTsFileAndDeletionSource.java |  26 +++
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  29 ++++
 6 files changed, 263 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 423353dd1fb..597120ffe1c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -157,11 +157,24 @@ public class PipeHeartbeatParser {
       if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
 
         temporaryMeta.markDataNodeCompleted(nodeId);
+        LOGGER.info(
+            "Detected historical pipe completion report from DataNode {} for 
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+            nodeId,
+            staticMeta.getPipeName(),
+            pipeHeartbeat.getRemainingEventCount(staticMeta),
+            pipeHeartbeat.getRemainingTime(staticMeta),
+            temporaryMeta.getCompletedDataNodeIds());
 
         final Set<Integer> uncompletedDataNodeIds =
             
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
         
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
         if (uncompletedDataNodeIds.isEmpty()) {
+          LOGGER.info(
+              "All DataNodes reported historical pipe {} completed. 
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+              staticMeta.getPipeName(),
+              temporaryMeta.getGlobalRemainingEvents(),
+              temporaryMeta.getGlobalRemainingTime(),
+              staticMeta);
           pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
           LOGGER.info(
               
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index ad44b78042a..7e6b0aa7781 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
 import 
org.apache.iotdb.db.pipe.source.schemaregion.PipePlanTreePrivilegeParseVisitor;
@@ -138,12 +139,20 @@ public class PipeEventCollector implements EventCollector 
{
 
     if (skipParsing || !forceTabletFormat && 
canSkipParsing4TsFileEvent(sourceEvent)) {
       collectEvent(sourceEvent);
+      if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+        PipeTerminateEvent.markHistoricalTsFileUnsplit(
+            sourceEvent.getPipeName(), sourceEvent.getCreationTime(), 
regionId);
+      }
       return;
     }
 
     try {
       sourceEvent.consumeTabletInsertionEventsWithRetry(
           this::collectParsedRawTableEvent, 
"PipeEventCollector::parseAndCollectEvent");
+      if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+        PipeTerminateEvent.markHistoricalTsFileSplit(
+            sourceEvent.getPipeName(), sourceEvent.getCreationTime(), 
regionId);
+      }
     } finally {
       sourceEvent.close();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index c40b37b31e5..fe9737b0d7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -181,6 +182,12 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
                     }
                   },
                   "PipeProcessorSubtask::executeOnce");
+              if (tsFileInsertionEvent.isGeneratedByHistoricalExtractor()) {
+                PipeTerminateEvent.markHistoricalTsFileSplit(
+                    tsFileInsertionEvent.getPipeName(),
+                    tsFileInsertionEvent.getCreationTime(),
+                    regionId);
+              }
               if (ex.get() != null) {
                 throw ex.get();
               }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 64209242cc4..12385c52b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -33,9 +33,16 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
@@ -45,6 +52,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class PipeTerminateEvent extends EnrichedEvent {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
   private final int dataRegionId;
 
   private final boolean shouldMark;
@@ -58,6 +67,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
   // Do not use call run policy to avoid deadlock
   private static final ExecutorService terminateExecutor = 
createTerminateExecutor();
 
+  private static final ConcurrentMap<HistoricalTransferKey, 
HistoricalTransferSummaryCounter>
+      HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();
+
   private static ExecutorService createTerminateExecutor() {
     final WrappedThreadPoolExecutor executor =
         new WrappedThreadPoolExecutor(
@@ -145,6 +157,18 @@ public class PipeTerminateEvent extends EnrichedEvent {
   }
 
   public void markCompleted() {
+    final HistoricalTransferSummary summary =
+        snapshotAndClearHistoricalTransferSummary(pipeName, creationTime, 
dataRegionId);
+    if (Objects.nonNull(summary)) {
+      LOGGER.info(
+          "Pipe {}@{}: terminate event committed for historical transfer. 
creationTime: {}, shouldMark: {}. {}",
+          pipeName,
+          dataRegionId,
+          creationTime,
+          shouldMark,
+          summary.toReportMessage());
+    }
+
     // To avoid deadlock
     if (shouldMark) {
       terminateExecutor.submit(
@@ -159,4 +183,159 @@ public class PipeTerminateEvent extends EnrichedEvent {
         + " - "
         + super.toString();
   }
+
+  public static void initializeHistoricalTransferSummary(
+      final String pipeName,
+      final long creationTime,
+      final int dataRegionId,
+      final long extractedHistoricalTsFileCount,
+      final long extractedHistoricalDeletionCount) {
+    HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
+        .computeIfAbsent(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+            ignored -> new HistoricalTransferSummaryCounter())
+        .initialize(extractedHistoricalTsFileCount, 
extractedHistoricalDeletionCount);
+  }
+
+  public static void markHistoricalTsFileSkipped(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .skippedHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static void markHistoricalTsFileSplit(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .splitHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static void markHistoricalTsFileUnsplit(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, 
dataRegionId)
+        .unsplitHistoricalTsFileCount
+        .incrementAndGet();
+  }
+
+  public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    final HistoricalTransferSummaryCounter counter =
+        HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+    return Objects.nonNull(counter) ? counter.snapshot() : null;
+  }
+
+  public static void clearHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+        new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+  }
+
+  private static HistoricalTransferSummary 
snapshotAndClearHistoricalTransferSummary(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    final HistoricalTransferSummaryCounter counter =
+        HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+            new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+    return Objects.nonNull(counter) ? counter.snapshot() : null;
+  }
+
+  private static HistoricalTransferSummaryCounter 
getOrCreateHistoricalTransferSummaryCounter(
+      final String pipeName, final long creationTime, final int dataRegionId) {
+    return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
+        new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+        ignored -> new HistoricalTransferSummaryCounter());
+  }
+
+  public static final class HistoricalTransferSummary {
+
+    private final long extractedHistoricalTsFileCount;
+    private final long skippedHistoricalTsFileCount;
+    private final long splitHistoricalTsFileCount;
+    private final long unsplitHistoricalTsFileCount;
+    private final long extractedHistoricalDeletionCount;
+
+    private HistoricalTransferSummary(
+        final long extractedHistoricalTsFileCount,
+        final long skippedHistoricalTsFileCount,
+        final long splitHistoricalTsFileCount,
+        final long unsplitHistoricalTsFileCount,
+        final long extractedHistoricalDeletionCount) {
+      this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
+      this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
+      this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
+      this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
+      this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
+    }
+
+    public String toReportMessage() {
+      return String.format(
+          "historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s, 
splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",
+          extractedHistoricalTsFileCount,
+          skippedHistoricalTsFileCount,
+          splitHistoricalTsFileCount,
+          unsplitHistoricalTsFileCount,
+          extractedHistoricalDeletionCount);
+    }
+  }
+
+  private static final class HistoricalTransferSummaryCounter {
+
+    private final AtomicLong extractedHistoricalTsFileCount = new 
AtomicLong(0);
+    private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
+    private final AtomicLong extractedHistoricalDeletionCount = new 
AtomicLong(0);
+
+    private void initialize(
+        final long extractedHistoricalTsFileCount, final long 
extractedHistoricalDeletionCount) {
+      this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
+      this.skippedHistoricalTsFileCount.set(0);
+      this.splitHistoricalTsFileCount.set(0);
+      this.unsplitHistoricalTsFileCount.set(0);
+      
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
+    }
+
+    private HistoricalTransferSummary snapshot() {
+      return new HistoricalTransferSummary(
+          extractedHistoricalTsFileCount.get(),
+          skippedHistoricalTsFileCount.get(),
+          splitHistoricalTsFileCount.get(),
+          unsplitHistoricalTsFileCount.get(),
+          extractedHistoricalDeletionCount.get());
+    }
+  }
+
+  private static final class HistoricalTransferKey {
+
+    private final String pipeName;
+    private final long creationTime;
+    private final int dataRegionId;
+
+    private HistoricalTransferKey(
+        final String pipeName, final long creationTime, final int 
dataRegionId) {
+      this.pipeName = pipeName;
+      this.creationTime = creationTime;
+      this.dataRegionId = dataRegionId;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof HistoricalTransferKey)) {
+        return false;
+      }
+      final HistoricalTransferKey that = (HistoricalTransferKey) obj;
+      return creationTime == that.creationTime
+          && dataRegionId == that.dataRegionId
+          && Objects.equals(pipeName, that.pipeName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(pipeName, creationTime, dataRegionId);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 053aac13887..e71d80a61a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -168,6 +168,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       new HashMap<>();
   private final Map<PersistentResource, Long> 
pendingResource2ReplicateIndexForIoTV2 =
       new HashMap<>();
+  private int extractedHistoricalTsFileCount = 0;
+  private int extractedHistoricalDeletionCount = 0;
 
   @Override
   public void validate(final PipeParameterValidator validator) {
@@ -488,6 +490,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       return;
     }
     hasBeenStarted = true;
+    extractedHistoricalTsFileCount = 0;
+    extractedHistoricalDeletionCount = 0;
 
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
@@ -521,6 +525,12 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
                   ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
                   : 
o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex()));
       pendingQueue = new ArrayDeque<>(originalResourceList);
+      PipeTerminateEvent.initializeHistoricalTransferSummary(
+          pipeName,
+          creationTime,
+          dataRegionId,
+          extractedHistoricalTsFileCount,
+          extractedHistoricalDeletionCount);
 
       LOGGER.info(
           DataNodePipeMessages.PIPE_FINISH_TO_SORT_ALL_EXTRACTED_RESOURCES,
@@ -649,6 +659,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
                   return true;
                 }
               });
+      extractedHistoricalTsFileCount = 
filteredTsFileResources2TableNames.size();
 
       LOGGER.info(
           "Pipe {}@{}: finish to extract historical TsFile, extracted sequence 
file count {}/{}, "
@@ -798,6 +809,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
                 })
             .collect(Collectors.toList());
     resourceList.addAll(allDeletionResources);
+    extractedHistoricalDeletionCount = allDeletionResources.size();
     LOGGER.info(
         
DataNodePipeMessages.PIPE_FINISH_TO_EXTRACT_DELETIONS_EXTRACT_DELETIONS,
         pipeName,
@@ -841,6 +853,16 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
   }
 
   private Event supplyTerminateEvent() {
+    final PipeTerminateEvent.HistoricalTransferSummary 
historicalTransferSummary =
+        PipeTerminateEvent.snapshotHistoricalTransferSummary(pipeName, 
creationTime, dataRegionId);
+    if (Objects.nonNull(historicalTransferSummary)) {
+      LOGGER.info(
+          "Pipe {}@{}: historical source has supplied all events, emitting 
terminate event. {}",
+          pipeName,
+          dataRegionId,
+          historicalTransferSummary.toReportMessage());
+    }
+
     final PipeTerminateEvent terminateEvent =
         new PipeTerminateEvent(
             pipeName,
@@ -867,6 +889,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
     }
 
     filteredTsFileResources2TableNames.remove(resource);
+    PipeTerminateEvent.markHistoricalTsFileSkipped(pipeName, creationTime, 
dataRegionId);
     LOGGER.info(
         
DataNodePipeMessages.PIPE_SKIP_HISTORICAL_TSFILE_BECAUSE_REALTIME_SOURCE,
         pipeName,
@@ -1074,6 +1097,9 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
   @Override
   public synchronized void close() {
+    if (!isTerminateSignalSent) {
+      PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, 
creationTime, dataRegionId);
+    }
     if (Objects.nonNull(pendingQueue)) {
       pendingQueue.forEach(
           resource -> {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 5cc9017df73..1a922448fc0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -49,6 +49,7 @@ public class PipeReceiverStatusHandler {
   private static final String NO_PERMISSION = "No permission";
   private static final String UNCLASSIFIED_EXCEPTION = "Unclassified 
exception";
   private static final String NO_PERMISSION_STR = "No permissions for this 
operation";
+  private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048;
 
   private final boolean isRetryAllowedWhenConflictOccurs;
   private final long retryMaxMillisWhenConflictOccurs;
@@ -147,6 +148,7 @@ public class PipeReceiverStatusHandler {
               PipeMessages.USER_CONFLICT_NOT_ALLOWED,
               shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not 
recorded",
               status);
+          logDiscardedUserConflictData("retry is not allowed", recordMessage, 
status);
           return;
         }
 
@@ -160,6 +162,7 @@ public class PipeReceiverStatusHandler {
                 PipeMessages.USER_CONFLICT_RETRY_TIMEOUT,
                 shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : 
"not recorded",
                 status);
+            logDiscardedUserConflictData("retry timeout", recordMessage, 
status);
             resetExceptionStatus();
             return;
           }
@@ -266,6 +269,32 @@ public class PipeReceiverStatusHandler {
     return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
   }
 
+  private void logDiscardedUserConflictData(
+      final String reason, final String recordMessage, final TSStatus status) {
+    if (!LOGGER.isWarnEnabled()) {
+      return;
+    }
+
+    LOGGER.warn(
+        "User conflict exception: discarded data info because {}. data: {}. 
receiver message: {}. status: {}",
+        reason,
+        summarizeRecordMessage(recordMessage),
+        status.getMessage(),
+        status);
+  }
+
+  private String summarizeRecordMessage(final String recordMessage) {
+    if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) {
+      return "<empty>";
+    }
+
+    final String normalizedRecordMessage =
+        recordMessage.replace('\r', ' ').replace('\n', ' ').trim();
+    return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG
+        ? normalizedRecordMessage
+        : normalizedRecordMessage.substring(0, 
MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)";
+  }
+
   private void recordExceptionStatusIfNecessary(final String message) {
     if (!Objects.equals(exceptionRecordedMessage.get(), message)) {
       exceptionFirstEncounteredTime.set(System.currentTimeMillis());

Reply via email to