This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new fdf3f4e7848 merge code branch
fdf3f4e7848 is described below
commit fdf3f4e784821ba297353d6000186bae884bf12d
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Thu Jun 20 17:58:09 2024 +0800
merge code branch
---
.../db/storageengine/dataregion/DataRegion.java | 44 +++++++++-------------
.../dataregion/memtable/TsFileProcessor.java | 20 +++-------
2 files changed, 24 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8771c3e7a50..05537b0c2b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -973,31 +973,25 @@ public class DataRegion implements IDataRegionForQuery {
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
public void insertTreeTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
- insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
- config.isEnableSeparateData()
- ? lastFlushTimeMap.getFlushedTime(
-
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
- insertTabletNode.getDeviceID())
- : Long.MAX_VALUE,
- false
- );
+ insertTablet(insertTabletNode, false);
}
public void insertRelationalTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
- insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
- config.isEnableSeparateData()
- ? lastFlushTimeMap.getFlushedTime(
-
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
- insertTabletNode.getDeviceID(i))
- : Long.MAX_VALUE,
- true
- );
+ insertTablet(insertTabletNode, true);
+ }
+
+ private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
+ return config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(
+ timePartitionID,
+ deviceID)
+ : Long.MAX_VALUE;
}
private boolean splitAndInsert(InsertTabletNode insertTabletNode,
- IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter, int loc,
+ int loc,
TSStatus[] results)
throws BatchProcessException, WriteProcessException {
boolean noFailure = true;
@@ -1014,9 +1008,9 @@ public class DataRegion implements IDataRegionForQuery {
// if is sequence
boolean isSequence = false;
while (loc < insertTabletNode.getRowCount()) {
- long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
long time = insertTabletNode.getTimes()[loc];
final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
+ long lastFlushTime = getLastFlushTime(timePartitionId,
insertTabletNode.getDeviceID(loc));
// always in some time partition
// judge if we should insert sequence
if (timePartitionId != beforeTimePartition) {
@@ -1024,7 +1018,7 @@ public class DataRegion implements IDataRegionForQuery {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode, before, loc, isSequence, results,
- beforeTimePartition, rowDeviceIdGetter, noFailure)
+ beforeTimePartition, noFailure)
&& noFailure;
before = loc;
beforeTimePartition = timePartitionId;
@@ -1035,7 +1029,7 @@ public class DataRegion implements IDataRegionForQuery {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode, before, loc, isSequence, results,
- beforeTimePartition, rowDeviceIdGetter, noFailure)
+ beforeTimePartition, noFailure)
&& noFailure;
before = loc;
isSequence = true;
@@ -1048,7 +1042,7 @@ public class DataRegion implements IDataRegionForQuery {
if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
beforeTimePartition, rowDeviceIdGetter, noFailure)
+ insertTabletNode, before, loc, isSequence, results,
beforeTimePartition, noFailure)
&& noFailure;
}
@@ -1062,7 +1056,6 @@ public class DataRegion implements IDataRegionForQuery {
*/
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
private void insertTablet(InsertTabletNode insertTabletNode,
- IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter,
boolean checkAllRowTtl)
throws BatchProcessException, WriteProcessException {
StorageEngine.blockInsertionIfReject(null);
@@ -1078,10 +1071,10 @@ public class DataRegion implements IDataRegionForQuery {
boolean noFailure;
int loc = checkTTL(insertTabletNode, results, i ->
DataNodeTTLCache.getInstance()
- .getTTL(rowDeviceIdGetter.apply(i)), !checkAllRowTtl);
+ .getTTL(insertTabletNode.getDeviceID(i)), !checkAllRowTtl);
noFailure = loc != 0;
- noFailure = noFailure & splitAndInsert(insertTabletNode,
rowDeviceIdGetter, rowLastFlushTimeGetter, loc, results);
+ noFailure = noFailure & splitAndInsert(insertTabletNode, loc, results);
startTime = System.nanoTime();
tryToUpdateInsertTabletLastCache(insertTabletNode);
@@ -1182,7 +1175,6 @@ public class DataRegion implements IDataRegionForQuery {
boolean sequence,
TSStatus[] results,
long timePartitionId,
- IntFunction<IDeviceID> rowDeviceIdGetter,
boolean noFailure) {
// return when start >= end or all measurement failed
if (start >= end || insertTabletNode.allMeasurementFailed()) {
@@ -1201,7 +1193,7 @@ public class DataRegion implements IDataRegionForQuery {
}
try {
- tsFileProcessor.insertTablet(insertTabletNode, start, end, results,
rowDeviceIdGetter, noFailure);
+ tsFileProcessor.insertTablet(insertTabletNode, start, end, results,
noFailure);
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 08c0431132f..cb4cd739fd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -421,11 +422,7 @@ public class TsFileProcessor {
long[] memIncrements;
try {
long startTime = System.nanoTime();
- if (insertTabletNode.isWriteToTable()) {
- memIncrements = checkTreeMemCost(insertTabletNode, start, end,
noFailure, results);
- } else {
- memIncrements = checkTableMemCost(insertTabletNode, start, end,
noFailure, results);
- }
+ memIncrements = checkMemCost(insertTabletNode, start, end, noFailure,
results);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() -
startTime);
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
@@ -437,18 +434,14 @@ public class TsFileProcessor {
return memIncrements;
}
- private long[] checkTreeMemCost(InsertTabletNode insertTabletNode, int
start, int end,
+ private long[] checkMemCost(InsertTabletNode insertTabletNode, int start,
int end,
boolean noFailure, TSStatus[] results)
throws WriteProcessException {
long[] memIncrements;
if (insertTabletNode.isAligned()) {
memIncrements =
- checkAlignedMemCostAndAddToTspForTablet(
- insertTabletNode.getDeviceID(),
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- insertTabletNode.getColumnCategories(),
+ checkAlignedMemCost(
+ insertTabletNode,
start,
end,
noFailure, results);
@@ -465,7 +458,7 @@ public class TsFileProcessor {
return memIncrements;
}
- private long[] checkTableMemCost(InsertTabletNode insertTabletNode, int
start, int end,
+ private long[] checkAlignedMemCost(InsertTabletNode insertTabletNode, int
start, int end,
boolean noFailure, TSStatus[] results)
throws WriteProcessException {
List<Pair<IDeviceID, Integer>> deviceEndPosList =
insertTabletNode.splitByDevice(start, end);
@@ -501,7 +494,6 @@ public class TsFileProcessor {
*/
public void insertTablet(
InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results,
- IntFunction<IDeviceID> rowDeviceIdGetter,
boolean noFailure)
throws WriteProcessException {