This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0c29fe5a0c0 Revert "Load: Parallelly load files into different target
data partitions (#13893)" (#13905)
0c29fe5a0c0 is described below
commit 0c29fe5a0c063578f6940db3bbf7dc9f056a3eb6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 25 10:26:59 2024 +0800
Revert "Load: Parallelly load files into different target data partitions
(#13893)" (#13905)
This reverts commit dbb99bc88dea50c6effffae4081abbba1ea5f76a.
---
.../db/storageengine/load/LoadTsFileManager.java | 57 +++++++---------------
1 file changed, 17 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index f785d0992eb..775dad1df46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -61,11 +61,8 @@ import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -455,47 +452,27 @@ public class LoadTsFileManager {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
-
for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
dataPartition2ModificationFile.entrySet()) {
entry.getValue().close();
}
-
- final List<Map.Entry<DataPartitionInfo, TsFileIOWriter>>
dataPartition2WriterList =
- new ArrayList<>(dataPartition2Writer.entrySet());
- Collections.shuffle(dataPartition2WriterList);
-
- final AtomicReference<Exception> exception = new AtomicReference<>();
- dataPartition2WriterList.parallelStream()
- .forEach(
- entry -> {
- try {
- final TsFileIOWriter writer = entry.getValue();
- if (writer.isWritingChunkGroup()) {
- writer.endChunkGroup();
- }
- writer.endFile();
-
- final DataRegion dataRegion = entry.getKey().getDataRegion();
- dataRegion.loadNewTsFile(
- generateResource(writer, progressIndex), true,
isGeneratedByPipe);
-
- // Metrics
- dataRegion
- .getNonSystemDatabaseName()
- .ifPresent(
- databaseName ->
- updateWritePointCountMetrics(
- dataRegion,
- databaseName,
- getTsFileWritePointCount(writer),
- false));
- } catch (final Exception e) {
- exception.set(e);
- }
- });
- if (exception.get() != null) {
- throw new LoadFileException(exception.get());
+ for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
+ TsFileIOWriter writer = entry.getValue();
+ if (writer.isWritingChunkGroup()) {
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+
+ DataRegion dataRegion = entry.getKey().getDataRegion();
+ dataRegion.loadNewTsFile(generateResource(writer, progressIndex),
true, isGeneratedByPipe);
+
+ // Metrics
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName ->
+ updateWritePointCountMetrics(
+ dataRegion, databaseName,
getTsFileWritePointCount(writer), false));
}
}