This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch batch_wal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 163d282487a996a49444fc53dd3fed1cd2858582 Author: HTHou <[email protected]> AuthorDate: Fri Mar 29 16:53:49 2024 +0800 init --- .../db/storageengine/dataregion/DataRegion.java | 23 ++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8500b735ebc..2a5602e315e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1170,7 +1170,7 @@ public class DataRegion implements IDataRegionForQuery { InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) { List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); long[] costsForMetrics = new long[4]; - Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>(); + Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>(); for (int i = 0; i < areSequence.length; i++) { InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); if (insertRowNode.allMeasurementFailed()) { @@ -1181,7 +1181,14 @@ public class DataRegion implements IDataRegionForQuery { if (tsFileProcessor == null) { continue; } - tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]); + tsFileProcessorMap.compute(tsFileProcessor, (k, v) -> { + if (v == null) { + v = new InsertRowsNode(insertRowsNode.getPlanNodeId()); + v.setSearchIndex(insertRowNode.getSearchIndex()); + } + v.addOneInsertRowNode(insertRowNode, v.getInsertRowNodeList().size()); + return v; + }); try { tsFileProcessor.insert(insertRowNode, costsForMetrics); } catch (WriteProcessException e) { @@ -1191,9 +1198,17 @@ public class DataRegion implements IDataRegionForQuery { } // check memtable size and may asyncTryToFlush the work memtable - for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) { + for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { + TsFileProcessor tsFileProcessor = entry.getKey(); + InsertRowsNode subInsertRowsNode = entry.getValue(); + try { + tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + } catch (WriteProcessException e) { + insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + } + if (entry.getKey().shouldFlush()) { - fileFlushPolicy.apply(this, entry.getKey(), entry.getValue()); + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); } }
