This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new bc078fccc89 Pipe: Fixed Async Connector Repeatedly Putting Events into RetryQueue (#15179) bc078fccc89 is described below commit bc078fccc89c7518db2d44d421373f5c47d51c64 Author: Zhenyu Luo <luoluoyuyu2...@gmail.com> AuthorDate: Tue Mar 25 09:58:29 2025 +0800 Pipe: Fixed Async Connector Repeatedly Putting Events into RetryQueue (#15179) Co-authored-by: Steve Yurong Su <r...@apache.org> --- .../async/IoTDBDataRegionAsyncConnector.java | 31 +++++++++++++--------- .../util/builder/PipeTableModeTsFileBuilder.java | 5 +--- .../sorter/PipeTableModelTabletEventSorter.java | 12 ++++----- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index b67e9da4699..3ca9635fb85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -214,18 +214,25 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { final AtomicInteger eventsReferenceCount = new AtomicInteger(dbTsFilePairs.size()); final AtomicBoolean eventsHadBeenAddedToRetryQueue = new AtomicBoolean(false); - for (final Pair<String, File> sealedFile : dbTsFilePairs) { - transfer( - new PipeTransferTsFileHandler( - this, - pipe2WeightMap, - events, - eventsReferenceCount, - eventsHadBeenAddedToRetryQueue, - sealedFile.right, - null, - false, - sealedFile.left)); + try { + for (final Pair<String, File> sealedFile : dbTsFilePairs) { + transfer( + new PipeTransferTsFileHandler( + this, + pipe2WeightMap, + events, + eventsReferenceCount, + eventsHadBeenAddedToRetryQueue, + sealedFile.right, + null, + false, + sealedFile.left)); + } + } catch (final Throwable t) { + LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t); + if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { + addFailureEventsToRetryQueue(events); + } } } else { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java index 13534a22b4d..2554e68c4b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java @@ -70,7 +70,7 @@ public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder { if (dataBase2TabletList.isEmpty()) { return new ArrayList<>(0); } - List<Pair<String, File>> pairList = new ArrayList<>(); + final List<Pair<String, File>> pairList = new ArrayList<>(); for (Map.Entry<String, List<Tablet>> entry : dataBase2TabletList.entrySet()) { final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, Integer>>>>> linkedHashSet = new LinkedHashSet<>(); @@ -113,9 +113,6 @@ public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder { .add((T) new Pair<>(tablet, WriteUtils.splitTabletByDevice(tablet))); } - // Replace ArrayList with LinkedList to improve performance - final LinkedHashSet<LinkedList<T>> table2Tablets = new LinkedHashSet<>(); - // Sort the tablets by start time in first device for (final List<T> tablets : tableName2Tablets.values()) { tablets.sort( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java index 4c618121f5f..4eb8672fe81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/sorter/PipeTableModelTabletEventSorter.java @@ -60,7 +60,7 @@ public class PipeTableModelTabletEventSorter { return; } - HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap = new HashMap<>(); + final HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap = new HashMap<>(); final long[] timestamps = tablet.getTimestamps(); IDeviceID lastDevice = tablet.getDeviceID(0); @@ -85,7 +85,7 @@ public class PipeTableModelTabletEventSorter { isUnSorted = true; } - List<Pair<Integer, Integer>> list = + final List<Pair<Integer, Integer>> list = deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>()); if (!list.isEmpty()) { @@ -97,7 +97,7 @@ public class PipeTableModelTabletEventSorter { previousTimestamp = currentTimestamp; } - List<Pair<Integer, Integer>> list = + final List<Pair<Integer, Integer>> list = deviceIDToIndexMap.computeIfAbsent(lastDevice, k -> new ArrayList<>()); if (!list.isEmpty()) { isUnSorted = true; @@ -167,7 +167,7 @@ public class PipeTableModelTabletEventSorter { } private void deduplicateTimestamps(final int startIndex, final int endIndex) { - long[] timestamps = tablet.getTimestamps(); + final long[] timestamps = tablet.getTimestamps(); long lastTime = timestamps[index[startIndex]]; index[deduplicatedSize++] = index[startIndex]; for (int i = startIndex + 1; i < endIndex; i++) { @@ -183,7 +183,7 @@ public class PipeTableModelTabletEventSorter { return; } - long[] timestamps = tablet.getTimestamps(); + final long[] timestamps = tablet.getTimestamps(); for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { final long currentTimestamp = timestamps[i]; final long previousTimestamp = timestamps[i - 1]; @@ -229,7 +229,7 @@ public class PipeTableModelTabletEventSorter { private void deduplicateTimestamps() { deduplicatedSize = 1; - long[] timestamps = tablet.getTimestamps(); + final long[] timestamps = tablet.getTimestamps(); long lastTime = timestamps[0]; IDeviceID deviceID = tablet.getDeviceID(index[0]); final Set<IDeviceID> deviceIDSet = new HashSet<>();