This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb2ab352fe Pipe: switch to IdentityHashMap backed set to avoid 
infinite loop in table tsfile builder (#15371)
fcb2ab352fe is described below

commit fcb2ab352fe8366507d629bd0cf5b733868966a0
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 15:14:19 2025 +0800

    Pipe: switch to IdentityHashMap backed set to avoid infinite loop in table 
tsfile builder (#15371)
---
 .../util/builder/PipeTableModeTsFileBuilder.java   | 30 ++++++++++++----------
 1 file changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index fcb735c55fc..b90f7b09580 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -76,10 +77,7 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
     }
     final List<Pair<String, File>> pairList = new ArrayList<>();
     for (Map.Entry<String, List<Tablet>> entry : 
dataBase2TabletList.entrySet()) {
-      final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID, 
Integer>>>>> linkedHashSet =
-          new LinkedHashSet<>();
-      pairList.addAll(
-          writeTableModelTabletsToTsFiles(entry.getValue(), entry.getKey(), 
linkedHashSet));
+      pairList.addAll(writeTableModelTabletsToTsFiles(entry.getValue(), 
entry.getKey()));
     }
     return pairList;
   }
@@ -103,10 +101,7 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
 
   private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
       List<Pair<String, File>> writeTableModelTabletsToTsFiles(
-          final List<Tablet> tabletList,
-          final String dataBase,
-          LinkedHashSet<LinkedList<T>> linkedHashSet)
-          throws IOException {
+          final List<Tablet> tabletList, final String dataBase) throws 
IOException {
 
     final Map<String, List<T>> tableName2Tablets = new HashMap<>();
 
@@ -130,10 +125,15 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
           });
     }
 
+    // Create a Set backed by an IdentityHashMap, so elements are compared by 
reference (==) rather
+    // than equals()/hashCode()
+    final Set<LinkedList<T>> device2TabletsLinkedList =
+        Collections.newSetFromMap(new IdentityHashMap<>());
+
     // Sort the tables by table name
     tableName2Tablets.entrySet().stream()
         .sorted(Map.Entry.comparingByKey(Comparator.naturalOrder()))
-        .forEach(entry -> linkedHashSet.add(new 
LinkedList<>(entry.getValue())));
+        .forEach(entry -> device2TabletsLinkedList.add(new 
LinkedList<>(entry.getValue())));
 
     // Help GC
     tableName2Tablets.clear();
@@ -141,13 +141,13 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
     final List<Pair<String, File>> sealedFiles = new ArrayList<>();
 
     // Try making the tsfile size as large as possible
-    while (!linkedHashSet.isEmpty()) {
+    while (!device2TabletsLinkedList.isEmpty()) {
       if (Objects.isNull(fileWriter)) {
         fileWriter = new TsFileWriter(createFile());
       }
 
       try {
-        tryBestToWriteTabletsIntoOneFile(linkedHashSet);
+        tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList);
       } catch (final Exception e) {
         LOGGER.warn(
             "Batch id = {}: Failed to write tablets into tsfile, because {}",
@@ -202,8 +202,8 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
   }
 
   private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
-      void tryBestToWriteTabletsIntoOneFile(
-          final LinkedHashSet<LinkedList<T>> device2TabletsLinkedList) throws 
IOException {
+      void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>> 
device2TabletsLinkedList)
+          throws IOException {
     final Iterator<LinkedList<T>> iterator = 
device2TabletsLinkedList.iterator();
 
     while (iterator.hasNext()) {
@@ -237,6 +237,8 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
           }
 
           tabletsToWrite.add(pair);
+          // NOTE: mutating a LinkedList that lives inside a Set violates the 
contract that the
+          // element’s hashCode must remain stable while it’s in the set
           tablets.pollFirst();
           continue;
         }

Reply via email to