This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fcb2ab352fe Pipe: switch to IdentityHashMap backed set to avoid
infinite loop in table tsfile builder (#15371)
fcb2ab352fe is described below
commit fcb2ab352fe8366507d629bd0cf5b733868966a0
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 15:14:19 2025 +0800
Pipe: switch to IdentityHashMap backed set to avoid infinite loop in table
tsfile builder (#15371)
---
.../util/builder/PipeTableModeTsFileBuilder.java | 30 ++++++++++++----------
1 file changed, 16 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index fcb735c55fc..b90f7b09580 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -76,10 +77,7 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
}
final List<Pair<String, File>> pairList = new ArrayList<>();
for (Map.Entry<String, List<Tablet>> entry :
dataBase2TabletList.entrySet()) {
- final LinkedHashSet<LinkedList<Pair<Tablet, List<Pair<IDeviceID,
Integer>>>>> linkedHashSet =
- new LinkedHashSet<>();
- pairList.addAll(
- writeTableModelTabletsToTsFiles(entry.getValue(), entry.getKey(),
linkedHashSet));
+ pairList.addAll(writeTableModelTabletsToTsFiles(entry.getValue(),
entry.getKey()));
}
return pairList;
}
@@ -103,10 +101,7 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
List<Pair<String, File>> writeTableModelTabletsToTsFiles(
- final List<Tablet> tabletList,
- final String dataBase,
- LinkedHashSet<LinkedList<T>> linkedHashSet)
- throws IOException {
+ final List<Tablet> tabletList, final String dataBase) throws
IOException {
final Map<String, List<T>> tableName2Tablets = new HashMap<>();
@@ -130,10 +125,15 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
});
}
+ // Create a Set backed by an IdentityHashMap, so elements are compared by
reference (==) rather
+ // than equals()/hashCode()
+ final Set<LinkedList<T>> device2TabletsLinkedList =
+ Collections.newSetFromMap(new IdentityHashMap<>());
+
// Sort the tables by table name
tableName2Tablets.entrySet().stream()
.sorted(Map.Entry.comparingByKey(Comparator.naturalOrder()))
- .forEach(entry -> linkedHashSet.add(new
LinkedList<>(entry.getValue())));
+ .forEach(entry -> device2TabletsLinkedList.add(new
LinkedList<>(entry.getValue())));
// Help GC
tableName2Tablets.clear();
@@ -141,13 +141,13 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
final List<Pair<String, File>> sealedFiles = new ArrayList<>();
// Try making the tsfile size as large as possible
- while (!linkedHashSet.isEmpty()) {
+ while (!device2TabletsLinkedList.isEmpty()) {
if (Objects.isNull(fileWriter)) {
fileWriter = new TsFileWriter(createFile());
}
try {
- tryBestToWriteTabletsIntoOneFile(linkedHashSet);
+ tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList);
} catch (final Exception e) {
LOGGER.warn(
"Batch id = {}: Failed to write tablets into tsfile, because {}",
@@ -202,8 +202,8 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
}
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
- void tryBestToWriteTabletsIntoOneFile(
- final LinkedHashSet<LinkedList<T>> device2TabletsLinkedList) throws
IOException {
+ void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>>
device2TabletsLinkedList)
+ throws IOException {
final Iterator<LinkedList<T>> iterator =
device2TabletsLinkedList.iterator();
while (iterator.hasNext()) {
@@ -237,6 +237,8 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
}
tabletsToWrite.add(pair);
+ // NOTE: mutating a LinkedList that lives inside a Set violates the
contract that the
+ // element’s hashCode must remain stable while it’s in the set
tablets.pollFirst();
continue;
}