This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch batch_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 163d282487a996a49444fc53dd3fed1cd2858582
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 29 16:53:49 2024 +0800

    init
---
 .../db/storageengine/dataregion/DataRegion.java    | 23 ++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8500b735ebc..2a5602e315e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1170,7 +1170,7 @@ public class DataRegion implements IDataRegionForQuery {
       InsertRowsNode insertRowsNode, boolean[] areSequence, long[] 
timePartitionIds) {
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     long[] costsForMetrics = new long[4];
-    Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new 
HashMap<>();
+    Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
     for (int i = 0; i < areSequence.length; i++) {
       InsertRowNode insertRowNode = 
insertRowsNode.getInsertRowNodeList().get(i);
       if (insertRowNode.allMeasurementFailed()) {
@@ -1181,7 +1181,14 @@ public class DataRegion implements IDataRegionForQuery {
       if (tsFileProcessor == null) {
         continue;
       }
-      tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]);
+      tsFileProcessorMap.compute(tsFileProcessor, (k, v) -> {
+        if (v == null) {
+          v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
+          v.setSearchIndex(insertRowNode.getSearchIndex());
+        }
+        v.addOneInsertRowNode(insertRowNode, v.getInsertRowNodeList().size());
+        return v;
+      });
       try {
         tsFileProcessor.insert(insertRowNode, costsForMetrics);
       } catch (WriteProcessException e) {
@@ -1191,9 +1198,17 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     // check memtable size and may asyncTryToFlush the work memtable
-    for (Map.Entry<TsFileProcessor, Boolean> entry : 
tsFileProcessorMapForFlushing.entrySet()) {
+    for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
+      TsFileProcessor tsFileProcessor = entry.getKey();
+      InsertRowsNode subInsertRowsNode = entry.getValue();
+      try {
+        tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
+      } catch (WriteProcessException e) {
+        insertRowsNode.getResults().put(i, 
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+
       if (entry.getKey().shouldFlush()) {
-        fileFlushPolicy.apply(this, entry.getKey(), entry.getValue());
+        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
       }
     }
 

Reply via email to