This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bc078fccc89 Pipe: Fixed Async Connector Repeatedly Putting Events into 
RetryQueue (#15179)
bc078fccc89 is described below

commit bc078fccc89c7518db2d44d421373f5c47d51c64
Author: Zhenyu Luo <luoluoyuyu2...@gmail.com>
AuthorDate: Tue Mar 25 09:58:29 2025 +0800

    Pipe: Fixed Async Connector Repeatedly Putting Events into RetryQueue 
(#15179)
    
    Co-authored-by: Steve Yurong Su <r...@apache.org>
---
 .../async/IoTDBDataRegionAsyncConnector.java       | 31 +++++++++++++---------
 .../util/builder/PipeTableModeTsFileBuilder.java   |  5 +---
 .../sorter/PipeTableModelTabletEventSorter.java    | 12 ++++-----
 3 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b67e9da4699..3ca9635fb85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -214,18 +214,25 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       final AtomicInteger eventsReferenceCount = new 
AtomicInteger(dbTsFilePairs.size());
       final AtomicBoolean eventsHadBeenAddedToRetryQueue = new 
AtomicBoolean(false);
 
-      for (final Pair<String, File> sealedFile : dbTsFilePairs) {
-        transfer(
-            new PipeTransferTsFileHandler(
-                this,
-                pipe2WeightMap,
-                events,
-                eventsReferenceCount,
-                eventsHadBeenAddedToRetryQueue,
-                sealedFile.right,
-                null,
-                false,
-                sealedFile.left));
+      try {
+        for (final Pair<String, File> sealedFile : dbTsFilePairs) {
+          transfer(
+              new PipeTransferTsFileHandler(
+                  this,
+                  pipe2WeightMap,
+                  events,
+                  eventsReferenceCount,
+                  eventsHadBeenAddedToRetryQueue,
+                  sealedFile.right,
+                  null,
+                  false,
+                  sealedFile.left));
+        }
+      } catch (final Throwable t) {
+        LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t);
+        if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
+          addFailureEventsToRetryQueue(events);
+        }
       }
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index 13534a22b4d..2554e68c4b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -70,7 +70,7 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
     if (dataBase2TabletList.isEmpty()) {
       return new ArrayList<>(0);
     }
-    List<Pair<String, File>> pairList = new ArrayList<>();
+    final List<Pair<String, File>> pairList = new ArrayList<>();
     for (Map.Entry<String, List<Tablet>> entry : 
dataBase2TabletList.entrySet()) {
       final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, 
Integer>>>>> linkedHashSet =
           new LinkedHashSet<>();
@@ -113,9 +113,6 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
           .add((T) new Pair<>(tablet, WriteUtils.splitTabletByDevice(tablet)));
     }
 
-    // Replace ArrayList with LinkedList to improve performance
-    final LinkedHashSet<LinkedList<T>> table2Tablets = new LinkedHashSet<>();
-
     // Sort the tablets by start time in first device
     for (final List<T> tablets : tableName2Tablets.values()) {
       tablets.sort(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java
index 4c618121f5f..4eb8672fe81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java
@@ -60,7 +60,7 @@ public class PipeTableModelTabletEventSorter {
       return;
     }
 
-    HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap = new 
HashMap<>();
+    final HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap 
= new HashMap<>();
     final long[] timestamps = tablet.getTimestamps();
 
     IDeviceID lastDevice = tablet.getDeviceID(0);
@@ -85,7 +85,7 @@ public class PipeTableModelTabletEventSorter {
         isUnSorted = true;
       }
 
-      List<Pair<Integer, Integer>> list =
+      final List<Pair<Integer, Integer>> list =
           deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new 
ArrayList<>());
 
       if (!list.isEmpty()) {
@@ -97,7 +97,7 @@ public class PipeTableModelTabletEventSorter {
       previousTimestamp = currentTimestamp;
     }
 
-    List<Pair<Integer, Integer>> list =
+    final List<Pair<Integer, Integer>> list =
         deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>());
     if (!list.isEmpty()) {
       isUnSorted = true;
@@ -167,7 +167,7 @@ public class PipeTableModelTabletEventSorter {
   }
 
   private void deduplicateTimestamps(final int startIndex, final int endIndex) 
{
-    long[] timestamps = tablet.getTimestamps();
+    final long[] timestamps = tablet.getTimestamps();
     long lastTime = timestamps[index[startIndex]];
     index[deduplicatedSize++] = index[startIndex];
     for (int i = startIndex + 1; i < endIndex; i++) {
@@ -183,7 +183,7 @@ public class PipeTableModelTabletEventSorter {
       return;
     }
 
-    long[] timestamps = tablet.getTimestamps();
+    final long[] timestamps = tablet.getTimestamps();
     for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
       final long currentTimestamp = timestamps[i];
       final long previousTimestamp = timestamps[i - 1];
@@ -229,7 +229,7 @@ public class PipeTableModelTabletEventSorter {
 
   private void deduplicateTimestamps() {
     deduplicatedSize = 1;
-    long[] timestamps = tablet.getTimestamps();
+    final long[] timestamps = tablet.getTimestamps();
     long lastTime = timestamps[0];
     IDeviceID deviceID = tablet.getDeviceID(index[0]);
     final Set<IDeviceID> deviceIDSet = new HashSet<>();

Reply via email to