This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new fdf3f4e7848 merge code branch
fdf3f4e7848 is described below

commit fdf3f4e784821ba297353d6000186bae884bf12d
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Thu Jun 20 17:58:09 2024 +0800

    merge code branch
---
 .../db/storageengine/dataregion/DataRegion.java    | 44 +++++++++-------------
 .../dataregion/memtable/TsFileProcessor.java       | 20 +++-------
 2 files changed, 24 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8771c3e7a50..05537b0c2b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -973,31 +973,25 @@ public class DataRegion implements IDataRegionForQuery {
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   public void insertTreeTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
-    insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
-        config.isEnableSeparateData()
-            ? lastFlushTimeMap.getFlushedTime(
-            
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
-            insertTabletNode.getDeviceID())
-            : Long.MAX_VALUE,
-        false
-    );
+    insertTablet(insertTabletNode, false);
   }
 
   public void insertRelationalTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
-    insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
-            config.isEnableSeparateData()
-                ? lastFlushTimeMap.getFlushedTime(
-                
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
-                insertTabletNode.getDeviceID(i))
-                : Long.MAX_VALUE,
-        true
-    );
+    insertTablet(insertTabletNode, true);
+  }
+
+  private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
+   return config.isEnableSeparateData()
+        ? lastFlushTimeMap.getFlushedTime(
+        timePartitionID,
+        deviceID)
+        : Long.MAX_VALUE;
   }
 
 
   private boolean splitAndInsert(InsertTabletNode insertTabletNode,
-      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter, int loc,
+      int loc,
       TSStatus[] results)
       throws BatchProcessException, WriteProcessException {
     boolean noFailure = true;
@@ -1014,9 +1008,9 @@ public class DataRegion implements IDataRegionForQuery {
     // if is sequence
     boolean isSequence = false;
     while (loc < insertTabletNode.getRowCount()) {
-      long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
       long time = insertTabletNode.getTimes()[loc];
       final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
+      long lastFlushTime = getLastFlushTime(timePartitionId, 
insertTabletNode.getDeviceID(loc));
       // always in some time partition
       // judge if we should insert sequence
       if (timePartitionId != beforeTimePartition) {
@@ -1024,7 +1018,7 @@ public class DataRegion implements IDataRegionForQuery {
         noFailure =
             insertTabletToTsFileProcessor(
                 insertTabletNode, before, loc, isSequence, results,
-                beforeTimePartition, rowDeviceIdGetter, noFailure)
+                beforeTimePartition, noFailure)
                 && noFailure;
         before = loc;
         beforeTimePartition = timePartitionId;
@@ -1035,7 +1029,7 @@ public class DataRegion implements IDataRegionForQuery {
         noFailure =
             insertTabletToTsFileProcessor(
                 insertTabletNode, before, loc, isSequence, results,
-                beforeTimePartition, rowDeviceIdGetter, noFailure)
+                beforeTimePartition, noFailure)
                 && noFailure;
         before = loc;
         isSequence = true;
@@ -1048,7 +1042,7 @@ public class DataRegion implements IDataRegionForQuery {
     if (before < loc) {
       noFailure =
           insertTabletToTsFileProcessor(
-              insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition, rowDeviceIdGetter, noFailure)
+              insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition, noFailure)
               && noFailure;
     }
 
@@ -1062,7 +1056,6 @@ public class DataRegion implements IDataRegionForQuery {
    */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   private void insertTablet(InsertTabletNode insertTabletNode,
-      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter,
       boolean checkAllRowTtl)
       throws BatchProcessException, WriteProcessException {
     StorageEngine.blockInsertionIfReject(null);
@@ -1078,10 +1071,10 @@ public class DataRegion implements IDataRegionForQuery {
       boolean noFailure;
 
       int loc = checkTTL(insertTabletNode, results, i -> 
DataNodeTTLCache.getInstance()
-          .getTTL(rowDeviceIdGetter.apply(i)), !checkAllRowTtl);
+          .getTTL(insertTabletNode.getDeviceID(i)), !checkAllRowTtl);
       noFailure = loc != 0;
 
-      noFailure = noFailure & splitAndInsert(insertTabletNode, 
rowDeviceIdGetter, rowLastFlushTimeGetter, loc, results);
+      noFailure = noFailure & splitAndInsert(insertTabletNode, loc, results);
 
       startTime = System.nanoTime();
       tryToUpdateInsertTabletLastCache(insertTabletNode);
@@ -1182,7 +1175,6 @@ public class DataRegion implements IDataRegionForQuery {
       boolean sequence,
       TSStatus[] results,
       long timePartitionId,
-      IntFunction<IDeviceID> rowDeviceIdGetter,
       boolean noFailure) {
     // return when start >= end or all measurement failed
     if (start >= end || insertTabletNode.allMeasurementFailed()) {
@@ -1201,7 +1193,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     try {
-      tsFileProcessor.insertTablet(insertTabletNode, start, end, results, 
rowDeviceIdGetter, noFailure);
+      tsFileProcessor.insertTablet(insertTabletNode, start, end, results, 
noFailure);
     } catch (WriteProcessRejectException e) {
       logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
       return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 08c0431132f..cb4cd739fd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -421,11 +422,7 @@ public class TsFileProcessor {
     long[] memIncrements;
     try {
       long startTime = System.nanoTime();
-      if (insertTabletNode.isWriteToTable()) {
-        memIncrements = checkTreeMemCost(insertTabletNode, start, end, 
noFailure, results);
-      } else {
-        memIncrements = checkTableMemCost(insertTabletNode, start, end, 
noFailure, results);
-      }
+      memIncrements = checkMemCost(insertTabletNode, start, end, noFailure, 
results);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - 
startTime);
     } catch (WriteProcessException e) {
       for (int i = start; i < end; i++) {
@@ -437,18 +434,14 @@ public class TsFileProcessor {
     return memIncrements;
   }
 
-  private long[] checkTreeMemCost(InsertTabletNode insertTabletNode, int 
start, int end,
+  private long[] checkMemCost(InsertTabletNode insertTabletNode, int start, 
int end,
       boolean noFailure, TSStatus[] results)
       throws WriteProcessException {
     long[] memIncrements;
     if (insertTabletNode.isAligned()) {
       memIncrements =
-          checkAlignedMemCostAndAddToTspForTablet(
-              insertTabletNode.getDeviceID(),
-              insertTabletNode.getMeasurements(),
-              insertTabletNode.getDataTypes(),
-              insertTabletNode.getColumns(),
-              insertTabletNode.getColumnCategories(),
+          checkAlignedMemCost(
+              insertTabletNode,
               start,
               end,
               noFailure, results);
@@ -465,7 +458,7 @@ public class TsFileProcessor {
     return memIncrements;
   }
 
-  private long[] checkTableMemCost(InsertTabletNode insertTabletNode, int 
start, int end,
+  private long[] checkAlignedMemCost(InsertTabletNode insertTabletNode, int 
start, int end,
       boolean noFailure, TSStatus[] results)
       throws WriteProcessException {
     List<Pair<IDeviceID, Integer>> deviceEndPosList = 
insertTabletNode.splitByDevice(start, end);
@@ -501,7 +494,6 @@ public class TsFileProcessor {
    */
   public void insertTablet(
       InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results,
-      IntFunction<IDeviceID> rowDeviceIdGetter,
       boolean noFailure)
       throws WriteProcessException {
 

Reply via email to