This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new f796b3fb568 add logical analyzer ut
f796b3fb568 is described below
commit f796b3fb5683e92898ac03c4b65daa69e5ff0460
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Mon Jun 24 16:13:34 2024 +0800
add logical analyzer ut
---
.../java/org/apache/iotdb/session/Session.java | 56 ++--
.../dataregion/DataExecutionVisitor.java | 3 +-
.../db/queryengine/common/MPPQueryContext.java | 1 -
.../execution/executor/RegionWriteExecutor.java | 4 +-
.../db/queryengine/plan/analyze/AnalyzeUtils.java | 55 ++--
.../queryengine/plan/analyze/AnalyzeVisitor.java | 10 +-
.../analyze/cache/schema/DataNodeTTLCache.java | 4 +-
.../plan/analyze/schema/SchemaValidator.java | 9 +-
.../distribution/WriteFragmentParallelPlanner.java | 6 +-
.../plan/planner/plan/node/PlanNodeType.java | 2 +-
.../plan/node/write/InsertMultiTabletsNode.java | 5 -
.../plan/planner/plan/node/write/InsertNode.java | 46 ++--
.../planner/plan/node/write/InsertRowNode.java | 5 -
.../planner/plan/node/write/InsertRowsNode.java | 5 -
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 5 -
.../planner/plan/node/write/InsertTabletNode.java | 60 ++---
.../node/write/RelationalInsertTabletNode.java | 50 ++--
.../relational/analyzer/StatementAnalyzer.java | 1 +
.../plan/relational/metadata/ColumnSchema.java | 10 +-
.../plan/relational/metadata/Metadata.java | 1 -
.../plan/relational/metadata/TableSchema.java | 34 ++-
.../plan/relational/planner/LogicalPlanner.java | 3 +-
.../plan/relational/planner/RelationPlanner.java | 7 +-
.../distribute/TableDistributionPlanner.java | 6 +-
.../RemoveRedundantIdentityProjections.java | 4 +
.../plan/relational/sql/ast/InsertTablet.java | 7 +-
.../relational/sql/ast/WrappedInsertStatement.java | 25 +-
.../plan/statement/crud/InsertTabletStatement.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 295 ++++++++-------------
.../dataregion/memtable/AbstractMemTable.java | 24 +-
.../memtable/AlignedWritableMemChunk.java | 3 +-
.../memtable/AlignedWritableMemChunkGroup.java | 3 +-
.../dataregion/memtable/IMemTable.java | 4 +-
.../dataregion/memtable/IWritableMemChunk.java | 3 +-
.../memtable/IWritableMemChunkGroup.java | 3 +-
.../dataregion/memtable/TsFileProcessor.java | 117 +++++---
.../dataregion/memtable/WritableMemChunk.java | 3 +-
.../dataregion/memtable/WritableMemChunkGroup.java | 3 +-
.../db/trigger/executor/TriggerFireVisitor.java | 4 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 4 +-
.../db/utils/datastructure/AlignedTVList.java | 13 +-
.../iotdb/db/utils/datastructure/TVList.java | 4 +-
.../plan/parser/StatementGeneratorTest.java | 13 +-
.../plan/relational/analyzer/AnalyzerTest.java | 98 ++++++-
.../plan/statement/StatementTestUtils.java | 37 ++-
.../dataregion/memtable/TsFileProcessorTest.java | 12 +-
.../db/utils/datastructure/AlignedTVListTest.java | 6 +-
.../commons/partition/DataPartitionQueryParam.java | 9 +
.../apache/iotdb/commons/schema/table/TsTable.java | 1 -
.../schema/table/column/TsTableColumnCategory.java | 6 +-
50 files changed, 584 insertions(+), 510 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index a7303eb6d91..47b058ab673 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -70,7 +70,6 @@ import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -619,9 +618,7 @@ public class Session implements ISession {
this.zoneId = ZoneId.of(zoneId);
}
- /**
- * Only changes the member variable of the Session object without sending it
to server.
- */
+ /** Only changes the member variable of the Session object without sending
it to server. */
@Override
public void setTimeZoneOfSession(String zoneId) {
defaultSessionConnection.setTimeZoneOfSession(zoneId);
@@ -992,7 +989,7 @@ public class Session implements ISession {
*
* @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
* @param lastTime get the last data, whose timestamp is greater than or
equal lastTime e.g.
- * 1621326244168
+ * 1621326244168
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long
lastTime, long timeOut)
@@ -1016,8 +1013,7 @@ public class Session implements ISession {
}
/**
- * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> +
<suffixPath> =
- * <TimeSeries>
+ * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> +
<suffixPath> = <TimeSeries>
*
* @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
*/
@@ -1051,7 +1047,7 @@ public class Session implements ISession {
// reconnect with default connection
return defaultSessionConnection.executeLastDataQueryForOneDevice(
- db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
+ db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
.left;
} else {
throw e;
@@ -1262,7 +1258,7 @@ public class Session implements ISession {
TEndPoint endPoint = null;
if (endPointToSessionConnection != null) {
for (Iterator<Entry<TEndPoint, SessionConnection>> it =
- endPointToSessionConnection.entrySet().iterator();
+ endPointToSessionConnection.entrySet().iterator();
it.hasNext(); ) {
Entry<TEndPoint, SessionConnection> entry = it.next();
if (entry.getValue().equals(sessionConnection)) {
@@ -1543,9 +1539,7 @@ public class Session implements ISession {
}
}
- /**
- * When the value is null,filter this,don't use this measurement.
- */
+ /** When the value is null,filter this,don't use this measurement. */
private void filterNullValueAndMeasurement(
List<String> deviceIds,
List<Long> times,
@@ -1571,9 +1565,7 @@ public class Session implements ISession {
}
}
- /**
- * Filter the null value of list。
- */
+ /** Filter the null value of list。 */
private void filterNullValueAndMeasurementOfOneDevice(
String deviceId,
List<Long> times,
@@ -1598,9 +1590,7 @@ public class Session implements ISession {
}
}
- /**
- * Filter the null value of list。
- */
+ /** Filter the null value of list。 */
private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
List<Long> times,
String deviceId,
@@ -2617,8 +2607,7 @@ public class Session implements ISession {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
request.setWriteToTable(true);
request.setColumnCategories(
- tablet.getColumnTypes().stream().map(t -> (byte) t.ordinal()).collect(
- Collectors.toList()));
+ tablet.getColumnTypes().stream().map(t -> (byte)
t.ordinal()).collect(Collectors.toList()));
insertTabletInternal(tablet, request);
}
@@ -3240,7 +3229,7 @@ public class Session implements ISession {
}
@SuppressWarnings({
- "squid:S3776"
+ "squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
public void sortTablet(Tablet tablet) {
/*
@@ -3434,13 +3423,13 @@ public class Session implements ISession {
*
* @param name name of the template
* @param schemaNames it works as a virtual layer inside template in 0.12,
and makes no difference
- * after 0.13
+ * after 0.13
* @param measurements the first measurement in each nested list will
constitute the final flat
- * template
+ * template
* @param dataTypes the data type of each measurement, only the first one in
each nested list
- * matters as above
+ * matters as above
* @param encodings the encoding of each measurement, only the first one in
each nested list
- * matters as above
+ * matters as above
* @param compressors the compressor of each measurement
* @deprecated
*/
@@ -3473,7 +3462,7 @@ public class Session implements ISession {
/**
* @param templateName Template to add aligned measurements.
* @param measurementsPath If measurements get different prefix, or the
prefix already exists in
- * template but not aligned, throw exception.
+ * template but not aligned, throw exception.
* @param dataTypes Data type of these measurements.
* @param encodings Encoding of these measurements.
* @param compressors CompressionType of these measurements.
@@ -3500,7 +3489,7 @@ public class Session implements ISession {
/**
* @param templateName Template to add a single aligned measurement.
* @param measurementPath If prefix of the path exists in template and not
aligned, throw
- * exception.
+ * exception.
*/
@Override
public void addAlignedMeasurementInTemplate(
@@ -3741,14 +3730,15 @@ public class Session implements ISession {
/**
* @param recordsGroup connection to record map
* @param insertConsumer insert function
- * @param <T> <ul>
- * <li>{@link TSInsertRecordsReq}
- * <li>{@link TSInsertStringRecordsReq}
- * <li>{@link TSInsertTabletsReq}
- * </ul>
+ * @param <T>
+ * <ul>
+ * <li>{@link TSInsertRecordsReq}
+ * <li>{@link TSInsertStringRecordsReq}
+ * <li>{@link TSInsertTabletsReq}
+ * </ul>
*/
@SuppressWarnings({
- "squid:S3776"
+ "squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
private <T> void insertByGroup(
Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 4fbccc10049..73c83918f66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -75,7 +75,8 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
}
@Override
- public TSStatus visitRelationalInsertTablet(RelationalInsertTabletNode node,
DataRegion dataRegion) {
+ public TSStatus visitRelationalInsertTablet(
+ RelationalInsertTabletNode node, DataRegion dataRegion) {
return visitInsertTablet(node, dataRegion);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 07437404b5a..944ad8ccb33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -373,7 +373,6 @@ public class MPPQueryContext {
// endregion
-
public boolean isTableQuery() {
return isTableQuery;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 83e41119cde..a1ee9be5cae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -210,8 +210,8 @@ public class RegionWriteExecutor {
}
@Override
- public RegionExecutionResult
visitRelationalInsertTablet(RelationalInsertTabletNode node,
- WritePlanNodeExecutionContext context) {
+ public RegionExecutionResult visitRelationalInsertTablet(
+ RelationalInsertTabletNode node, WritePlanNodeExecutionContext
context) {
return executeDataInsert(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
index 2fb1089d2a3..3656dd68e89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
@@ -78,38 +77,42 @@ public class AnalyzeUtils {
analyzeDataPartition(
analysis,
- partitionQueryParamComputation.compute(realStatement),
+ partitionQueryParamComputation.compute(realStatement, context),
context.getSession().getUserName(),
partitionFetcher);
return realStatement;
}
- public static List<DataPartitionQueryParam>
computeTableDataPartitionParams(InsertBaseStatement statement) {
+ public static List<DataPartitionQueryParam> computeTableDataPartitionParams(
+ InsertBaseStatement statement, MPPQueryContext context) {
if (statement instanceof InsertTabletStatement) {
InsertTabletStatement insertTabletStatement = (InsertTabletStatement)
statement;
Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new
HashMap<>();
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
-
timePartitionSlotMap.computeIfAbsent(insertTabletStatement.getTableDeviceID(i),
- id -> new
HashSet<>()).add(insertTabletStatement.getTimePartitionSlot(i));
+ timePartitionSlotMap
+ .computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id ->
new HashSet<>())
+ .add(insertTabletStatement.getTimePartitionSlot(i));
}
- return computeDataPartitionParams(timePartitionSlotMap);
+ return computeDataPartitionParams(timePartitionSlotMap,
context.getSession().getDatabaseName().get());
} else if (statement instanceof InsertMultiTabletsStatement) {
- InsertMultiTabletsStatement insertMultiTabletsStatement =
(InsertMultiTabletsStatement) statement;
+ InsertMultiTabletsStatement insertMultiTabletsStatement =
+ (InsertMultiTabletsStatement) statement;
Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new
HashMap<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
-
timePartitionSlotMap.computeIfAbsent(insertTabletStatement.getTableDeviceID(i),
- id -> new
HashSet<>()).add(insertTabletStatement.getTimePartitionSlot(i));
+ timePartitionSlotMap
+ .computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id
-> new HashSet<>())
+ .add(insertTabletStatement.getTimePartitionSlot(i));
}
}
- return computeDataPartitionParams(timePartitionSlotMap);
+ return computeDataPartitionParams(timePartitionSlotMap,
context.getSession().getDatabaseName().get());
}
throw new UnsupportedOperationException("computeDataPartitionParams for "
+ statement);
}
-
- public static List<DataPartitionQueryParam>
computeTreeDataPartitionParams(InsertBaseStatement statement) {
+ public static List<DataPartitionQueryParam> computeTreeDataPartitionParams(
+ InsertBaseStatement statement, MPPQueryContext context) {
if (statement instanceof InsertTabletStatement) {
InsertTabletStatement insertTabletStatement = (InsertTabletStatement)
statement;
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
@@ -117,9 +120,11 @@ public class AnalyzeUtils {
insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice());
dataPartitionQueryParam.setTimePartitionSlotList(
insertTabletStatement.getTimePartitionSlots());
+
dataPartitionQueryParam.setDatabaseName(context.getSession().getDatabaseName().get());
return Collections.singletonList(dataPartitionQueryParam);
} else if (statement instanceof InsertMultiTabletsStatement) {
- InsertMultiTabletsStatement insertMultiTabletsStatement =
(InsertMultiTabletsStatement) statement;
+ InsertMultiTabletsStatement insertMultiTabletsStatement =
+ (InsertMultiTabletsStatement) statement;
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
@@ -129,28 +134,32 @@ public class AnalyzeUtils {
k -> new HashSet<>());
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
}
- return computeDataPartitionParams(dataPartitionQueryParamMap);
+ return computeDataPartitionParams(dataPartitionQueryParamMap,
context.getSession().getDatabaseName().get());
} else if (statement instanceof InsertRowsStatement) {
final InsertRowsStatement insertRowsStatement = (InsertRowsStatement)
statement;
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
- for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
+ for (InsertRowStatement insertRowStatement :
+ insertRowsStatement.getInsertRowStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
dataPartitionQueryParamMap.computeIfAbsent(
- insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
k -> new HashSet<>());
+ insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
+ k -> new HashSet<>());
timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot());
}
- return computeDataPartitionParams(dataPartitionQueryParamMap);
+ return computeDataPartitionParams(dataPartitionQueryParamMap,
context.getSession().getDatabaseName().get());
}
throw new UnsupportedOperationException("computeDataPartitionParams for "
+ statement);
}
- public static List<DataPartitionQueryParam>
computeDataPartitionParams(Map<IDeviceID, Set<TTimePartitionSlot>>
dataPartitionQueryParamMap) {
+ public static List<DataPartitionQueryParam> computeDataPartitionParams(
+ Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap,
String databaseName) {
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
- for (Map.Entry<IDeviceID
- , Set<TTimePartitionSlot>> entry :
dataPartitionQueryParamMap.entrySet()) {
+ for (Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
+ dataPartitionQueryParamMap.entrySet()) {
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
dataPartitionQueryParam.setDeviceID(entry.getKey());
dataPartitionQueryParam.setTimePartitionSlotList(new
ArrayList<>(entry.getValue()));
+ dataPartitionQueryParam.setDatabaseName(databaseName);
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
return dataPartitionQueryParams;
@@ -203,9 +212,7 @@ public class AnalyzeUtils {
}
}
- /**
- * get analysis according to statement and params
- */
+ /** get analysis according to statement and params */
public static void analyzeDataPartition(
IAnalysis analysis,
List<DataPartitionQueryParam> dataPartitionQueryParams,
@@ -232,6 +239,6 @@ public class AnalyzeUtils {
}
public interface DataPartitionQueryParamComputation {
- List<DataPartitionQueryParam> compute(InsertBaseStatement statement);
+ List<DataPartitionQueryParam> compute(InsertBaseStatement statement,
MPPQueryContext context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 3af6468e4b9..365020f52a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -190,9 +190,9 @@ import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
+import static
org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema;
-import static
org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -2690,7 +2690,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
} else {
AnalyzeUtils.analyzeDataPartition(
analysis,
- AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement),
+ AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement,
context),
context.getSession().getUserName(),
partitionFetcher::getOrCreateDataPartition);
}
@@ -2716,7 +2716,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
AnalyzeUtils.analyzeDataPartition(
analysis,
- AnalyzeUtils.computeTreeDataPartitionParams(realInsertRowsStatement),
+ AnalyzeUtils.computeTreeDataPartitionParams(realInsertRowsStatement,
context),
context.getSession().getUserName(),
partitionFetcher::getOrCreateDataPartition);
return analysis;
@@ -2741,7 +2741,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
AnalyzeUtils.analyzeDataPartition(
analysis,
- AnalyzeUtils.computeTreeDataPartitionParams(realStatement),
+ AnalyzeUtils.computeTreeDataPartitionParams(realStatement, context),
context.getSession().getUserName(),
partitionFetcher::getOrCreateDataPartition);
return analysis;
@@ -2779,7 +2779,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
} else {
AnalyzeUtils.analyzeDataPartition(
analysis,
- AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement),
+ AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement,
context),
context.getSession().getUserName(),
partitionFetcher::getOrCreateDataPartition);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
index d26348c5ffb..1e7f0ae070c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
@@ -22,14 +22,12 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
-
import org.apache.iotdb.db.utils.CommonUtils;
+
import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static
org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATER_NO_REGEX;
-
public class DataNodeTTLCache {
private final TTLCache ttlCache;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index be74ef5be14..9967767c11d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -19,14 +19,11 @@
package org.apache.iotdb.db.queryengine.plan.analyze.schema;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
@@ -66,8 +63,8 @@ public class SchemaValidator {
try {
String databaseName = context.getSession().getDatabaseName().get();
final TableSchema incomingSchema = insertStatement.getTableSchema();
- final TableSchema realSchema =
metadata.validateTableHeaderSchema(databaseName,
- incomingSchema, context);
+ final TableSchema realSchema =
+ metadata.validateTableHeaderSchema(databaseName, incomingSchema,
context);
insertStatement.validate(realSchema);
metadata.validateDeviceSchema(insertStatement, context);
insertStatement.updateAfterSchemaValidation(context);
@@ -76,8 +73,6 @@ public class SchemaValidator {
}
}
-
-
public static ISchemaTree validate(
ISchemaFetcher schemaFetcher,
List<PartialPath> devicePaths,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 511e3bc7466..975e0c1736e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
-import java.util.function.BiFunction;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -32,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.BiFunction;
public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
@@ -49,7 +49,9 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
}
public WriteFragmentParallelPlanner(
- SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext,
+ SubPlan subPlan,
+ IAnalysis analysis,
+ MPPQueryContext queryContext,
BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>> nodeSplitter) {
this.subPlan = subPlan;
this.analysis = analysis;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index c918366f9a2..b82f7d80242 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -109,8 +109,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 430ab19c7c9..802bd579fdf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -168,11 +168,6 @@ public class InsertMultiTabletsNode extends InsertNode {
return StatusUtils.getFailingStatus(results, insertTabletNodeList.size());
}
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
@Override
public void addChild(PlanNode child) {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 032020ef303..a48340c52fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
+import java.util.Collections;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -29,36 +27,35 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
-import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
- /**
- * this insert node doesn't need to participate in iot consensus
- */
+ /** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/**
- * if use id table, this filed is id form of device path <br> if not, this
filed is device
- * path<br>
+ * if use id table, this filed is id form of device path <br>
+ * if not, this filed is device path<br>
*/
protected PartialPath devicePath;
@@ -74,7 +71,8 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected int failedMeasurementNumber = 0;
/**
- * device id reference, for reuse device id in both id table and memtable
<br> used in memtable
+ * device id reference, for reuse device id in both id table and memtable
<br>
+ * used in memtable
*/
protected IDeviceID deviceID;
@@ -84,9 +82,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- /**
- * Physical address of data region after splitting
- */
+ /** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
protected ProgressIndex progressIndex;
@@ -158,8 +154,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
}
public boolean isValidMeasurement(int i) {
- return measurementSchemas != null && measurementSchemas[i] != null &&
(columnCategories == null
- || columnCategories[i] == TsTableColumnCategory.MEASUREMENT);
+ return measurementSchemas != null
+ && measurementSchemas[i] != null
+ && (columnCategories == null || columnCategories[i] ==
TsTableColumnCategory.MEASUREMENT);
}
public void setMeasurements(String[] measurements) {
@@ -207,9 +204,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return searchIndex;
}
- /**
- * Search index should start from 1
- */
+ /** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
}
@@ -226,9 +221,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
// region Serialization methods for WAL
- /**
- * Serialized size of measurement schemas, ignoring failed time series
- */
+ /** Serialized size of measurement schemas, ignoring failed time series */
protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
for (int i = 0; i < measurements.length; i++) {
@@ -241,9 +234,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return byteLen;
}
- /**
- * Serialize measurement schemas, ignoring failed time series
- */
+ /** Serialize measurement schemas, ignoring failed time series */
protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
for (int i = 0; i < measurements.length; i++) {
// ignore failed partial insert
@@ -386,4 +377,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
public String getTableName() {
return null;
}
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return Collections.emptyList();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 3f70e312223..16138cafa2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -121,11 +121,6 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
return Collections.singletonList(this);
}
- @Override
- public List<PlanNode> getChildren() {
- return Collections.emptyList();
- }
-
@Override
public void addChild(PlanNode child) {
// no child for InsertRowNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 2fc54f9d080..958d04c195c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -120,11 +120,6 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
}
- @Override
- public List<PlanNode> getChildren() {
- return Collections.emptyList();
- }
-
@Override
public void addChild(PlanNode child) {
// Do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 3c2797c5077..69c99373360 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -126,11 +126,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
storeMeasurementsAndDataType();
}
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
@Override
public void addChild(PlanNode child) {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d0e733b69e1..87d3bac372e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,11 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
-import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
-
-import java.util.Map.Entry;
-import java.util.function.IntFunction;
-import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -45,13 +40,12 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
-
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
@@ -72,7 +66,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
+import java.util.function.IntToLongFunction;
+
+import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
public class InsertTabletNode extends InsertNode implements WALEntryValue {
@@ -177,11 +175,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
this.range = range;
}
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
@Override
public void addChild(PlanNode child) {}
@@ -213,8 +206,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
collectSplitRanges();
- final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
- deviceIDSplitInfoMap, analysis);
+ final Map<TRegionReplicaSet, List<Integer>> splitMap =
+ splitByReplicaSet(deviceIDSplitInfoMap, analysis);
return doSplit(splitMap);
}
@@ -230,8 +223,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
IDeviceID nextDeviceId = getDeviceID(i);
if (times[i] >= upperBoundOfTimePartition ||
!currDeviceId.equals(nextDeviceId)) {
- final PartitionSplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
- deviceID1 -> new PartitionSplitInfo());
+ final PartitionSplitInfo splitInfo =
+ deviceIDSplitInfoMap.computeIfAbsent(
+ currDeviceId, deviceID1 -> new PartitionSplitInfo());
// a new range.
splitInfo.ranges.add(startLoc); // included
splitInfo.ranges.add(i); // excluded
@@ -244,8 +238,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- PartitionSplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
- deviceID1 -> new PartitionSplitInfo());
+ PartitionSplitInfo splitInfo =
+ deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new
PartitionSplitInfo());
// the final range
splitInfo.ranges.add(startLoc); // included
splitInfo.ranges.add(times.length); // excluded
@@ -254,16 +248,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return deviceIDSplitInfoMap;
}
- private Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
IAnalysis analysis) {
+ private Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+ Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis
analysis) {
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
for (Entry<IDeviceID, PartitionSplitInfo> entry :
deviceIDSplitInfoMap.entrySet()) {
final IDeviceID deviceID = entry.getKey();
final PartitionSplitInfo splitInfo = entry.getValue();
- final List<TRegionReplicaSet> replicaSets = analysis
- .getDataPartitionInfo()
- .getDataRegionReplicaSetForWriting(
- deviceID, splitInfo.timePartitionSlots);
+ final List<TRegionReplicaSet> replicaSets =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(deviceID,
splitInfo.timePartitionSlots);
splitInfo.replicaSets = replicaSets;
// collect redirectInfo
analysis.addEndPointToRedirectNodeList(
@@ -355,7 +350,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return subNode;
}
-
@TestOnly
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
@@ -460,7 +454,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
}
-
/** Serialize measurements or measurement schemas, ignoring failed time
series */
private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(),
buffer);
@@ -1185,6 +1178,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
/**
* Split the tablet of the given range according to Table deviceID.
+ *
* @param start inclusive
* @param end exclusive
* @return each the number in the pair is the end offset of a device
@@ -1193,22 +1187,18 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return Collections.singletonList(new Pair<>(deviceID, end));
}
-
/**
- *
* @param results insertion result of each row
* @param rowTTLGetter the ttl associated with each row
* @return the position of the first alive row
* @throws OutOfTTLException if all rows have expired the TTL
*/
- public int checkTTL(TSStatus[] results,
- IntToLongFunction rowTTLGetter)
- throws OutOfTTLException {
+ public int checkTTL(TSStatus[] results, IntToLongFunction rowTTLGetter)
throws OutOfTTLException {
return checkTTLInternal(results, rowTTLGetter, true);
}
- protected int checkTTLInternal(TSStatus[] results,
- IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
+ protected int checkTTLInternal(
+ TSStatus[] results, IntToLongFunction rowTTLGetter, boolean
breakOnFirstAlive)
throws OutOfTTLException {
/*
@@ -1228,8 +1218,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
String.format(
"Insertion time [%s] is less than ttl time bound [%s]",
DateTimeUtils.convertLongToDate(currTime),
- DateTimeUtils.convertLongToDate(
- CommonDateTimeUtils.currentTime() - ttl)));
+
DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl)));
} else {
if (firstAliveLoc == -1) {
firstAliveLoc = loc;
@@ -1244,8 +1233,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
if (firstAliveLoc == -1) {
// no alive data
throw new OutOfTTLException(
- getTimes()[getTimes().length - 1],
- (CommonDateTimeUtils.currentTime() - ttl));
+ getTimes()[getTimes().length - 1],
(CommonDateTimeUtils.currentTime() - ttl));
}
return firstAliveLoc;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index f86e3302d87..e1ec01d96e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -19,21 +19,15 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
-import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -42,6 +36,13 @@ import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.IntToLongFunction;
+
public class RelationalInsertTabletNode extends InsertTabletNode {
// deviceId cache for Table-view insertion
@@ -49,13 +50,27 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
public RelationalInsertTabletNode(
PlanNodeId id,
- PartialPath devicePath, boolean isAligned, String[] measurements,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
TSDataType[] dataTypes,
- MeasurementSchema[] measurementSchemas, long[] times,
- BitMap[] bitMaps, Object[] columns, int rowCount,
+ MeasurementSchema[] measurementSchemas,
+ long[] times,
+ BitMap[] bitMaps,
+ Object[] columns,
+ int rowCount,
TsTableColumnCategory[] columnCategories) {
- super(id, devicePath, isAligned, measurements, dataTypes,
measurementSchemas, times, bitMaps,
- columns, rowCount);
+ super(
+ id,
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ bitMaps,
+ columns,
+ rowCount);
setColumnCategories(columnCategories);
}
@@ -69,10 +84,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
}
if (deviceIDs[rowIdx] == null) {
String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
- deviceIdSegments[0] = this.devicePath.getFullPath();
+ deviceIdSegments[0] = this.getTableName();
for (int i = 0; i < idColumnIndices.size(); i++) {
final Integer columnIndex = idColumnIndices.get(i);
- deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
+ deviceIdSegments[i + 1] = ((Object[])
columns[columnIndex])[rowIdx].toString();
}
deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
}
@@ -167,7 +182,8 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
}
public String getTableName() {
- return deviceID.getTableName();
+ return devicePath.getFullPath();
}
-}
+
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 6b561bd37de..eabd7a17467 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -378,6 +378,7 @@ public class StatementAnalyzer {
analysis,
false);
insert.setInnerTreeStatement(insertTabletStatement);
+ analysis.setScope(insert, ret);
return ret;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index 91f074811d7..8a0ed2f1716 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+
import org.apache.tsfile.read.common.type.BinaryType;
import org.apache.tsfile.read.common.type.BooleanType;
import org.apache.tsfile.read.common.type.DoubleType;
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.StringJoiner;
-import org.apache.tsfile.write.record.Tablet.ColumnType;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
@@ -147,8 +146,11 @@ public class ColumnSchema {
}
public static ColumnSchema ofTsColumnSchema(TsTableColumnSchema schema) {
- return new ColumnSchema(schema.getColumnName(),
TypeFactory.getType(schema.getDataType()),
- false, schema.getColumnCategory());
+ return new ColumnSchema(
+ schema.getColumnName(),
+ TypeFactory.getType(schema.getDataType()),
+ false,
+ schema.getColumnCategory());
}
public static Builder builder() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
index 0f300b780d0..f79efcc62a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
-import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index 4b88ef4807f..8fbf1416d06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -19,15 +19,18 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Objects;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+
import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.util.ArrayList;
+import java.util.List;
+
public class TableSchema {
private final String tableName;
@@ -62,11 +65,30 @@ public class TableSchema {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
List<ColumnType> columnTypes = new ArrayList<>();
for (ColumnSchema column : columns) {
- measurementSchemas.add(new MeasurementSchema(column.getName(),
- InternalTypeManager.getTSDataType(column.getType())));
+ measurementSchemas.add(
+ new MeasurementSchema(
+ column.getName(),
InternalTypeManager.getTSDataType(column.getType())));
columnTypes.add(column.getColumnCategory().toTsFileColumnType());
}
- return
- new org.apache.tsfile.file.metadata.TableSchema(tableName,
measurementSchemas, columnTypes);
+ return new org.apache.tsfile.file.metadata.TableSchema(
+ tableName, measurementSchemas, columnTypes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableSchema that = (TableSchema) o;
+ return Objects.equals(tableName, that.tableName) && Objects.equals(columns,
+ that.columns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, columns);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index b5179525904..6d582fdcc9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -41,11 +41,10 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedStatement;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedStatement;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.write.UnSupportedDataTypeException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 4f8a76d3be0..5359ef2f197 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -13,11 +13,9 @@
*/
package org.apache.iotdb.db.queryengine.plan.relational.planner;
-import java.util.Collections;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
@@ -39,14 +37,15 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Union;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import static java.util.Objects.requireNonNull;
@@ -198,6 +197,6 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
insertTabletStatement.getRowCount(),
insertTabletStatement.getColumnCategories());
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
- return new RelationPlan(insertNode, analysis.getScope(node),
Collections.emptyList());
+ return new RelationPlan(insertNode, analysis.getRootScope(),
Collections.emptyList());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index ab561ef09df..95bad56b2dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -81,9 +81,9 @@ public class TableDistributionPlanner {
List<FragmentInstance> fragmentInstances =
mppQueryContext.getQueryType() == QueryType.READ
? new TableModelQueryFragmentPlanner(subPlan, analysis,
mppQueryContext).plan()
- :
- new WriteFragmentParallelPlanner(subPlan, analysis,
mppQueryContext,
- WritePlanNode::splitByPartition).parallelPlan();
+ : new WriteFragmentParallelPlanner(
+ subPlan, analysis, mppQueryContext,
WritePlanNode::splitByPartition)
+ .parallelPlan();
// Only execute this step for READ operation
if (mppQueryContext.getQueryType() == QueryType.READ) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
index 474a327292f..aeae8d85deb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
@@ -19,6 +19,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -45,6 +46,9 @@ public class RemoveRedundantIdentityProjections implements
RelationalPlanOptimiz
@Override
public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+ if (node instanceof WritePlanNode) {
+ return node;
+ }
PlanNode newNode = node.clone();
for (PlanNode child : node.getChildren()) {
context.setParent(node);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
index 007a2b8ff01..bc230a47690 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
@@ -19,15 +19,16 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.udf.api.type.Binary;
+
import org.apache.tsfile.file.metadata.IDeviceID;
+import java.util.ArrayList;
+import java.util.List;
+
public class InsertTablet extends WrappedInsertStatement {
public InsertTablet(InsertTabletStatement insertTabletStatement,
MPPQueryContext context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 355786f86af..33b6ea24082 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,12 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
@@ -33,10 +30,12 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.tsfile.read.common.type.TypeFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
-public abstract class WrappedInsertStatement extends WrappedStatement
implements
- ITableDeviceSchemaValidation {
+public abstract class WrappedInsertStatement extends WrappedStatement
+ implements ITableDeviceSchemaValidation {
protected TableSchema tableSchema;
@@ -85,16 +84,20 @@ public abstract class WrappedInsertStatement extends
WrappedStatement implements
public static void validate(ColumnSchema incoming, ColumnSchema real) {
if (real == null) {
- throw new SemanticException("Column " + incoming.getName() + " does not
exists or fails to be "
- + "created");
+ throw new SemanticException(
+ "Column " + incoming.getName() + " does not exists or fails to be "
+ "created");
}
if (!incoming.getType().equals(real.getType())) {
- throw new SemanticException(String.format("Inconsistent data type of
column %s: %s/%s",
- incoming.getName(), incoming.getType(), real.getType()));
+ throw new SemanticException(
+ String.format(
+ "Inconsistent data type of column %s: %s/%s",
+ incoming.getName(), incoming.getType(), real.getType()));
}
if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
- throw new SemanticException(String.format("Inconsistent column category
of column %s: %s/%s",
- incoming.getName(), incoming.getColumnCategory(),
real.getColumnCategory()));
+ throw new SemanticException(
+ String.format(
+ "Inconsistent column category of column %s: %s/%s",
+ incoming.getName(), incoming.getColumnCategory(),
real.getColumnCategory()));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 9753f50e5a6..ea35b3624aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -448,7 +449,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public Statement toRelationalStatement(MPPQueryContext context) {
- return super.toRelationalStatement(context);
+ return new InsertTablet(this, context);
}
public IDeviceID getTableDeviceID(int rowIdx) {
@@ -460,7 +461,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
deviceIdSegments[0] = this.devicePath.getFullPath();
for (int i = 0; i < idColumnIndices.size(); i++) {
final Integer columnIndex = idColumnIndices.get(i);
- deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
+ deviceIdSegments[i + 1] = ((Object[])
columns[columnIndex])[rowIdx].toString();
}
deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index bdd0922a759..cf751e63f44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion;
-import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -163,6 +162,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.IntToLongFunction;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
@@ -214,19 +214,13 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /**
- * Condition to safely delete data region.
- */
+ /** Condition to safely delete data region. */
private final Condition deletedCondition =
insertLock.writeLock().newCondition();
- /**
- * Data region has been deleted or not.
- */
+ /** Data region has been deleted or not. */
private volatile boolean deleted = false;
- /**
- * closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done.
- */
+ /** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
private final Object closeStorageGroupCondition = new Object();
/**
@@ -234,50 +228,32 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /**
- * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
- */
+ /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
- /**
- * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
- */
+ /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- /**
- * sequence {@link TsFileProcessor}s which are closing.
- */
+ /** sequence {@link TsFileProcessor}s which are closing. */
private final Set<TsFileProcessor> closingSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /**
- * unsequence {@link TsFileProcessor}s which are closing.
- */
+ /** unsequence {@link TsFileProcessor}s which are closing. */
private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /**
- * data region id.
- */
+ /** data region id. */
private final String dataRegionId;
- /**
- * database name.
- */
+ /** database name. */
private final String databaseName;
- /**
- * database system directory.
- */
+ /** database system directory. */
private File storageGroupSysDir;
- /**
- * manage seqFileList and unSeqFileList.
- */
+ /** manage seqFileList and unSeqFileList. */
private final TsFileManager tsFileManager;
- /**
- * manage tsFileResource degrade.
- */
+ /** manage tsFileResource degrade. */
private final TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
/**
@@ -288,14 +264,10 @@ public class DataRegion implements IDataRegionForQuery {
private final HashMap<Long, VersionController>
timePartitionIdVersionControllerMap =
new HashMap<>();
- /**
- * file system factory (local or hdfs).
- */
+ /** file system factory (local or hdfs). */
private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
- /**
- * File flush policy.
- */
+ /** File flush policy. */
private TsFileFlushPolicy fileFlushPolicy;
/**
@@ -306,24 +278,16 @@ public class DataRegion implements IDataRegionForQuery {
*/
private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
- /**
- * database info for mem control.
- */
+ /** database info for mem control. */
private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
- /**
- * whether it's ready from recovery.
- */
+ /** whether it's ready from recovery. */
private boolean isReady = false;
- /**
- * close file listeners.
- */
+ /** close file listeners. */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
- /**
- * flush listeners.
- */
+ /** flush listeners. */
private List<FlushListener> customFlushListeners = Collections.emptyList();
private ILastFlushTimeMap lastFlushTimeMap;
@@ -347,10 +311,10 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Construct a database processor.
*
- * @param systemDir system dir path
- * @param dataRegionId data region id e.g. 1
+ * @param systemDir system dir path
+ * @param dataRegionId data region id e.g. 1
* @param fileFlushPolicy file flush policy
- * @param databaseName database name e.g. root.sg1
+ * @param databaseName database name e.g. root.sg1
*/
public DataRegion(
String systemDir, String dataRegionId, TsFileFlushPolicy
fileFlushPolicy, String databaseName)
@@ -433,29 +397,19 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
- /**
- * this class is used to store recovering context.
- */
+ /** this class is used to store recovering context. */
private class DataRegionRecoveryContext {
- /**
- * number of files to be recovered.
- */
+ /** number of files to be recovered. */
private final long numOfFilesToRecover;
- /**
- * number of already recovered files.
- */
+ /** number of already recovered files. */
private long recoveredFilesNum;
- /**
- * last recovery log time.
- */
+ /** last recovery log time. */
private long lastLogTime;
- /**
- * recover performers of unsealed TsFiles.
- */
+ /** recover performers of unsealed TsFiles. */
private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new
ArrayList<>();
public DataRegionRecoveryContext(long numOfFilesToRecover) {
@@ -487,9 +441,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * recover from file
- */
+ /** recover from file */
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
private void recover() throws DataRegionException {
try {
@@ -759,9 +711,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * check if the tsfile's time is smaller than system current time.
- */
+ /** check if the tsfile's time is smaller than system current time. */
private void checkTsFileTime(File tsFile, long currentTime) throws
DataRegionException {
String[] items = tsFile.getName().replace(TSFILE_SUFFIX,
"").split(FILE_NAME_SEPARATOR);
long fileTime = Long.parseLong(items[0]);
@@ -774,9 +724,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * submit unsealed TsFile to WALRecoverManager.
- */
+ /** submit unsealed TsFile to WALRecoverManager. */
private WALRecoverListener recoverUnsealedTsFile(
TsFileResource unsealedTsFile, DataRegionRecoveryContext context,
boolean isSeq) {
UnsealedTsFileRecoverPerformer recoverPerformer =
@@ -861,9 +809,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * recover sealed TsFile.
- */
+ /** recover sealed TsFile. */
private void recoverSealedTsFiles(
TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean
isSeq) {
try (SealedTsFileRecoverPerformer recoverPerformer =
@@ -954,7 +900,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
// insert to sequence or unSequence file
TsFileProcessor tsFileProcessor =
@@ -969,19 +915,13 @@ public class DataRegion implements IDataRegionForQuery {
}
}
-
private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
- return config.isEnableSeparateData()
- ? lastFlushTimeMap.getFlushedTime(
- timePartitionID,
- deviceID)
+ return config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(timePartitionID, deviceID)
: Long.MAX_VALUE;
}
-
- private boolean splitAndInsert(InsertTabletNode insertTabletNode,
- int loc,
- TSStatus[] results)
+ private boolean splitAndInsert(InsertTabletNode insertTabletNode, int loc,
TSStatus[] results)
throws BatchProcessException, WriteProcessException {
boolean noFailure = true;
@@ -989,8 +929,7 @@ public class DataRegion implements IDataRegionForQuery {
int before = loc;
long beforeTime = insertTabletNode.getTimes()[before];
// before time partition
- long beforeTimePartition =
- TimePartitionUtils.getTimePartitionId(beforeTime);
+ long beforeTimePartition =
TimePartitionUtils.getTimePartitionId(beforeTime);
// init flush time map
initFlushTimeMap(beforeTimePartition);
@@ -1006,8 +945,13 @@ public class DataRegion implements IDataRegionForQuery {
// a new partition, insert the remaining of the previous partition
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
- beforeTimePartition, noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
before = loc;
beforeTimePartition = timePartitionId;
@@ -1017,8 +961,13 @@ public class DataRegion implements IDataRegionForQuery {
// insert previous range into unsequence
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
- beforeTimePartition, noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
before = loc;
isSequence = true;
@@ -1031,7 +980,13 @@ public class DataRegion implements IDataRegionForQuery {
if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
beforeTimePartition, noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
}
@@ -1057,8 +1012,9 @@ public class DataRegion implements IDataRegionForQuery {
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure;
- int loc = insertTabletNode.checkTTL(results, i ->
DataNodeTTLCache.getInstance()
- .getTTL(insertTabletNode.getDeviceID(i)));
+ int loc =
+ insertTabletNode.checkTTL(
+ results, i ->
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDeviceID(i)));
noFailure = loc == 0;
noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
@@ -1089,8 +1045,11 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private int checkTTL(InsertTabletNode insertTabletNode, TSStatus[] results,
- IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
+ private int checkTTL(
+ InsertTabletNode insertTabletNode,
+ TSStatus[] results,
+ IntToLongFunction rowTTLGetter,
+ boolean breakOnFirstAlive)
throws OutOfTTLException {
/*
@@ -1110,8 +1069,7 @@ public class DataRegion implements IDataRegionForQuery {
String.format(
"Insertion time [%s] is less than ttl time bound [%s]",
DateTimeUtils.convertLongToDate(currTime),
- DateTimeUtils.convertLongToDate(
- CommonDateTimeUtils.currentTime() - ttl)));
+
DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl)));
} else {
if (firstAliveLoc == -1) {
firstAliveLoc = loc;
@@ -1132,18 +1090,17 @@ public class DataRegion implements IDataRegionForQuery {
return firstAliveLoc;
}
-
/**
* insert batch to tsfile processor thread-safety that the caller need to
guarantee The rows to be
* inserted are in the range [start, end) Null value in each column values
will be replaced by the
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*
* @param insertTabletNode insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in
insertTabletPlan
- * @param end end index of rows to be inserted in
insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(
@@ -1193,15 +1150,18 @@ public class DataRegion implements IDataRegionForQuery {
private void registerToTsFile(InsertNode node, TsFileProcessor
tsFileProcessor) {
final String tableName = node.getTableName();
if (tableName != null) {
- tsFileProcessor.registerToTsFile(tableName,
- t ->
TableSchema.of(DataNodeTableCache.getInstance().getTable(getDatabaseName(),
t)).toTsFileTableSchema());
+ tsFileProcessor.registerToTsFile(
+ tableName,
+ t ->
+
TableSchema.of(DataNodeTableCache.getInstance().getTable(getDatabaseName(), t))
+ .toTsFileTableSchema());
}
}
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
// disable updating last cache on follower
return;
}
@@ -1482,9 +1442,9 @@ public class DataRegion implements IDataRegionForQuery {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(
long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
boolean sequence)
@@ -1568,7 +1528,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
@@ -1600,7 +1560,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
@@ -1665,9 +1625,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * close all tsfile resource
- */
+ /** close all tsfile resource */
public void closeAllResources() {
for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
try {
@@ -1685,9 +1643,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * delete tsfile
- */
+ /** delete tsfile */
public void syncDeleteDataFiles() throws TsFileProcessorException {
logger.info(
"{} will close all files for deleting data files", databaseName + "-"
+ dataRegionId);
@@ -1797,9 +1753,7 @@ public class DataRegion implements IDataRegionForQuery {
WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId,
count);
}
- /**
- * This method will be blocked until all tsfile processors are closed.
- */
+ /** This method will be blocked until all tsfile processors are closed. */
public void syncCloseAllWorkingTsFileProcessors() {
try {
List<Future<?>> tsFileProcessorsClosingFutures =
asyncCloseAllWorkingTsFileProcessors();
@@ -1838,9 +1792,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * close all working tsfile processors
- */
+ /** close all working tsfile processors */
List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");
List<Future<?>> futures = new ArrayList<>();
@@ -1862,9 +1814,7 @@ public class DataRegion implements IDataRegionForQuery {
return futures;
}
- /**
- * force close all working tsfile processors
- */
+ /** force close all working tsfile processors */
public void forceCloseAllWorkingTsFileProcessors() throws
TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
@@ -1885,9 +1835,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * used for queryengine
- */
+ /** used for queryengine */
@Override
public QueryDataSource query(
List<IFullPath> pathList,
@@ -2051,9 +1999,7 @@ public class DataRegion implements IDataRegionForQuery {
return fileScanHandles;
}
- /**
- * lock the read lock of the insert lock
- */
+ /** lock the read lock of the insert lock */
@Override
public void readLock() {
// apply read lock for SG insert lock to prevent inconsistent with
concurrently writing memtable
@@ -2062,26 +2008,20 @@ public class DataRegion implements IDataRegionForQuery {
tsFileManager.readLock();
}
- /**
- * unlock the read lock of insert lock
- */
+ /** unlock the read lock of insert lock */
@Override
public void readUnlock() {
tsFileManager.readUnlock();
insertLock.readLock().unlock();
}
- /**
- * lock the write lock of the insert lock
- */
+ /** lock the write lock of the insert lock */
public void writeLock(String holder) {
insertLock.writeLock().lock();
insertWriteLockHolder = holder;
}
- /**
- * unlock the write lock of the insert lock
- */
+ /** unlock the write lock of the insert lock */
public void writeUnlock() {
insertWriteLockHolder = "";
insertLock.writeLock().unlock();
@@ -2131,9 +2071,7 @@ public class DataRegion implements IDataRegionForQuery {
return tsfileResourcesForQuery;
}
- /**
- * Seperate tsfiles in TsFileManager to sealedList and unsealedList.
- */
+ /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
private void separateTsFile(
List<TsFileResource> sealedResource,
List<TsFileResource> unsealedResource,
@@ -2563,9 +2501,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * Put the memtable back to the MemTablePool and make the metadata in writer
visible
- */
+ /** Put the memtable back to the MemTablePool and make the metadata in
writer visible */
// TODO please consider concurrency with read and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
throws TsFileProcessorException {
@@ -2661,9 +2597,7 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
- /**
- * Schedule settle compaction for ttl check.
- */
+ /** Schedule settle compaction for ttl check. */
public int executeTTLCheck() throws InterruptedException {
while (!isCompactionSelecting.compareAndSet(false, true)) {
// wait until success
@@ -2792,9 +2726,7 @@ public class DataRegion implements IDataRegionForQuery {
return getNonSystemDatabaseName(databaseName);
}
- /**
- * Merge file under this database processor
- */
+ /** Merge file under this database processor */
public int compact() {
writeLock("merge");
CompactionScheduler.exclusiveLockCompactionSelection();
@@ -2814,7 +2746,7 @@ public class DataRegion implements IDataRegionForQuery {
* <p>Then, update the latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile
module
- * @param deleteOriginFile whether to delete origin tsfile
+ * @param deleteOriginFile whether to delete origin tsfile
* @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
public void loadNewTsFile(
@@ -3000,9 +2932,8 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
- *
- * @UsedBy sync module, load external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
+ * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
*/
protected void updateLastFlushTime(TsFileResource newTsFileResource) {
for (IDeviceID device : newTsFileResource.getDevices()) {
@@ -3020,8 +2951,8 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Execute the loading process by the type.
*
- * @param tsFileResource tsfile resource to be loaded
- * @param filePartitionId the partition id of the new file
+ * @param tsFileResource tsfile resource to be loaded
+ * @param filePartitionId the partition id of the new file
* @param deleteOriginFile whether to delete the original file
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
@@ -3264,14 +3195,14 @@ public class DataRegion implements IDataRegionForQuery {
* "tsFileResource" have the same plan indexes as the local one.
*
* @return true if any file contains plans with indexes no less than the max
plan index of
- * "tsFileResource", otherwise false.
+ * "tsFileResource", otherwise false.
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// examine working processor first as they have the largest plan index
return isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getSequenceFileList())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getUnSequenceFileList());
}
@@ -3395,7 +3326,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
if (tsFileProcessor == null) {
continue;
@@ -3504,8 +3435,8 @@ public class DataRegion implements IDataRegionForQuery {
areSequence[i] =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(
- timePartitionIds[i], insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(
+ timePartitionIds[i], insertRowNode.getDeviceID());
}
insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
if (!insertRowsNode.getResults().isEmpty()) {
@@ -3569,7 +3500,7 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * @param folder the folder's path
+ * @param folder the folder's path
* @param diskSize the disk space occupied by this folder, unit is MB
*/
private void countFolderDiskSize(String folder, AtomicLong diskSize) {
@@ -3679,9 +3610,7 @@ public class DataRegion implements IDataRegionForQuery {
return insertWriteLockHolder;
}
- /**
- * This method could only be used in iot consensus
- */
+ /** This method could only be used in iot consensus */
public IWALNode getWALNode() {
if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
throw new UnsupportedOperationException();
@@ -3691,9 +3620,7 @@ public class DataRegion implements IDataRegionForQuery {
.applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
}
- /**
- * Wait for this data region successfully deleted
- */
+ /** Wait for this data region successfully deleted */
public void waitForDeleted() {
writeLock("waitForDeleted");
try {
@@ -3709,9 +3636,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * Release all threads waiting for this data region successfully deleted
- */
+ /** Release all threads waiting for this data region successfully deleted */
public void markDeleted() {
writeLock("markDeleted");
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index f17b9fc0ec6..cd6d5aa7c20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -351,15 +351,16 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
- public void insertAlignedTablet(InsertTabletNode insertTabletNode, int
start, int end,
- TSStatus[] results)
+ public void insertAlignedTablet(
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results)
throws WriteProcessException {
try {
writeAlignedTablet(insertTabletNode, start, end, results);
- //TODO-Table: what is the relation between this and
TsFileProcessor.checkMemCost
+ // TODO-Table: what is the relation between this and
TsFileProcessor.checkMemCost
memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end,
results);
int pointsInserted =
- (insertTabletNode.getMeasurementColumnCnt() -
insertTabletNode.getFailedMeasurementNumber())
+ (insertTabletNode.getMeasurementColumnCnt()
+ - insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
@@ -434,18 +435,20 @@ public abstract class AbstractMemTable implements
IMemTable {
insertTabletNode.getBitMaps(),
schemaList,
start,
- end, null)) {
+ end,
+ null)) {
shouldFlush = true;
}
}
- public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start,
int end,
- TSStatus[] results) {
+ public void writeAlignedTablet(
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results) {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
- if (insertTabletNode.getColumns()[i] == null ||
- (insertTabletNode.getColumnCategories() != null &&
insertTabletNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT)) {
+ if (insertTabletNode.getColumns()[i] == null
+ || (insertTabletNode.getColumnCategories() != null
+ && insertTabletNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT)) {
schemaList.add(null);
} else {
schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
@@ -454,7 +457,8 @@ public abstract class AbstractMemTable implements IMemTable
{
if (schemaList.isEmpty()) {
return;
}
- final List<Pair<IDeviceID, Integer>> deviceEndOffsetPair =
insertTabletNode.splitByDevice(start, end);
+ final List<Pair<IDeviceID, Integer>> deviceEndOffsetPair =
+ insertTabletNode.splitByDevice(start, end);
int splitStart = start;
for (Pair<IDeviceID, Integer> pair : deviceEndOffsetPair) {
final IDeviceID deviceID = pair.left;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index f904b9de49a..4b25096245a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -188,7 +188,8 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results) {
+ int end,
+ TSStatus[] results) {
Pair<Object[], BitMap[]> pair =
checkAndReorderColumnValuesInInsertPlan(schemaList, valueList,
bitMaps);
Object[] reorderedColumnValues = pair.left;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 797880b186f..6eae996d4c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -56,7 +56,8 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results) {
+ int end,
+ TSStatus[] results) {
return memChunk.writeAlignedValuesWithFlushCheck(
times, columns, bitMaps, schemaList, start, end, results);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 4fbfb451302..be5febcb0e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -116,8 +116,8 @@ public interface IMemTable extends WALEntryValue {
void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException;
- void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int
end,
- TSStatus[] results)
+ void insertAlignedTablet(
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results)
throws WriteProcessException;
ReadOnlyMemChunk query(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 376444072da..e1070d4f06f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -79,7 +79,8 @@ public interface IWritableMemChunk extends WALEntryValue {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results);
+ int end,
+ TSStatus[] results);
long count();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index 29ab0f4afbb..883779c2344 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -37,7 +37,8 @@ public interface IWritableMemChunkGroup extends WALEntryValue
{
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results);
+ int end,
+ TSStatus[] results);
void release();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bece21045f4..9fb4d679893 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
-import java.util.function.Function;
-import java.util.function.IntFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -49,7 +47,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -112,6 +109,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
import static
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
@@ -122,6 +120,7 @@ public class TsFileProcessor {
/** Logger fot this class. */
private static final Logger logger =
LoggerFactory.getLogger(TsFileProcessor.class);
+
private static final int NUM_MEM_TO_ESTIMATE = 3;
/** Storage group name of this tsfile. */
@@ -420,7 +419,8 @@ public class TsFileProcessor {
walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
}
- private long[] checkMemCost(InsertTabletNode insertTabletNode, int start,
int end, TSStatus[] results, boolean noFailure)
+ private long[] checkMemCost(
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results, boolean noFailure)
throws WriteProcessException {
long[] memIncrements;
try {
@@ -437,17 +437,12 @@ public class TsFileProcessor {
return memIncrements;
}
- private long[] checkMemCost(InsertTabletNode insertTabletNode, int start,
int end,
- boolean noFailure, TSStatus[] results)
+ private long[] checkMemCost(
+ InsertTabletNode insertTabletNode, int start, int end, boolean
noFailure, TSStatus[] results)
throws WriteProcessException {
long[] memIncrements;
if (insertTabletNode.isAligned()) {
- memIncrements =
- checkAlignedMemCost(
- insertTabletNode,
- start,
- end,
- noFailure, results);
+ memIncrements = checkAlignedMemCost(insertTabletNode, start, end,
noFailure, results);
} else {
memIncrements =
checkMemCostAndAddToTspInfoForTablet(
@@ -461,8 +456,8 @@ public class TsFileProcessor {
return memIncrements;
}
- private long[] checkAlignedMemCost(InsertTabletNode insertTabletNode, int
start, int end,
- boolean noFailure, TSStatus[] results)
+ private long[] checkAlignedMemCost(
+ InsertTabletNode insertTabletNode, int start, int end, boolean
noFailure, TSStatus[] results)
throws WriteProcessException {
List<Pair<IDeviceID, Integer>> deviceEndPosList =
insertTabletNode.splitByDevice(start, end);
long[] memIncrements = new long[NUM_MEM_TO_ESTIMATE];
@@ -470,13 +465,17 @@ public class TsFileProcessor {
for (Pair<IDeviceID, Integer> iDeviceIDIntegerPair : deviceEndPosList) {
int splitEnd = iDeviceIDIntegerPair.getRight();
IDeviceID deviceID = iDeviceIDIntegerPair.getLeft();
- long[] splitMemIncrements = checkAlignedMemCostAndAddToTspForTablet(
- deviceID,
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- insertTabletNode.getColumnCategories(), splitStart,
- splitEnd, noFailure, results);
+ long[] splitMemIncrements =
+ checkAlignedMemCostAndAddToTspForTablet(
+ deviceID,
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ insertTabletNode.getColumnCategories(),
+ splitStart,
+ splitEnd,
+ noFailure,
+ results);
for (int i = 0; i < NUM_MEM_TO_ESTIMATE; i++) {
memIncrements[i] += splitMemIncrements[i];
}
@@ -496,8 +495,7 @@ public class TsFileProcessor {
* @param results result array
*/
public void insertTablet(
- InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results,
- boolean noFailure)
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results, boolean noFailure)
throws WriteProcessException {
if (workMemTable == null) {
@@ -832,15 +830,28 @@ public class TsFileProcessor {
String[] measurements,
TSDataType[] dataTypes,
Object[] columns,
- TsTableColumnCategory[] columnCategories, int start,
- int end, boolean noFailure, TSStatus[] results)
+ TsTableColumnCategory[] columnCategories,
+ int start,
+ int end,
+ boolean noFailure,
+ TSStatus[] results)
throws WriteProcessException {
if (start >= end) {
return new long[] {0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
- updateAlignedMemCost(dataTypes, deviceId, measurements, start, end,
memIncrements, columns, columnCategories, noFailure, results);
+ updateAlignedMemCost(
+ dataTypes,
+ deviceId,
+ measurements,
+ start,
+ end,
+ memIncrements,
+ columns,
+ columnCategories,
+ noFailure,
+ results);
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
@@ -893,7 +904,9 @@ public class TsFileProcessor {
int start,
int end,
long[] memIncrements,
- Object[] columns, TsTableColumnCategory[] columnCategories, boolean
noFailure,
+ Object[] columns,
+ TsTableColumnCategory[] columnCategories,
+ boolean noFailure,
TSStatus[] results) {
int incomingPointNum;
if (noFailure) {
@@ -902,7 +915,7 @@ public class TsFileProcessor {
incomingPointNum = end - start;
for (TSStatus result : results) {
if (result != null) {
- incomingPointNum --;
+ incomingPointNum--;
}
}
}
@@ -927,8 +940,12 @@ public class TsFileProcessor {
* ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR);
// TVList memory
- int numArraysToAdd = incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
+
- incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ int numArraysToAdd =
+ incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ + incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
+ ? 1
+ : 0;
memIncrements[0] +=
numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes,
columnCategories);
} else {
@@ -942,8 +959,11 @@ public class TsFileProcessor {
TSDataType dataType = dataTypes[i];
String measurement = measurementIds[i];
Object column = columns[i];
- if (dataType == null || column == null || measurement == null ||
- (columnCategories != null && columnCategories[i] !=
TsTableColumnCategory.MEASUREMENT)) {
+ if (dataType == null
+ || column == null
+ || measurement == null
+ || (columnCategories != null
+ && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
continue;
}
@@ -957,10 +977,18 @@ public class TsFileProcessor {
}
// calculate how many new arrays will be added after this insertion
- int currentArrayCnt = currentPointNum /
PrimitiveArrayManager.ARRAY_SIZE +
- currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
- int newArrayCnt = newPointNum / PrimitiveArrayManager.ARRAY_SIZE +
- newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ int currentArrayCnt =
+ currentPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ + currentPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
+ ? 1
+ : 0;
+ int newArrayCnt =
+ newPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ + newPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
+ ? 1
+ : 0;
long acquireArray = newArrayCnt - currentArrayCnt;
if (acquireArray != 0) {
@@ -976,8 +1004,11 @@ public class TsFileProcessor {
TSDataType dataType = dataTypes[i];
String measurement = measurementIds[i];
Object column = columns[i];
- if (dataType == null || column == null || measurement == null ||
- (columnCategories != null && columnCategories[i] !=
TsTableColumnCategory.MEASUREMENT)) {
+ if (dataType == null
+ || column == null
+ || measurement == null
+ || (columnCategories != null
+ && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
continue;
}
@@ -2204,9 +2235,11 @@ public class TsFileProcessor {
return flushingMemTables;
}
- public void registerToTsFile(String tableName,
- Function<String, TableSchema> tableSchemaFunction) {
- getWriter().getKnownSchema().getTableSchemaMap().computeIfAbsent(tableName,
- tableSchemaFunction);
+ public void registerToTsFile(
+ String tableName, Function<String, TableSchema> tableSchemaFunction) {
+ getWriter()
+ .getKnownSchema()
+ .getTableSchemaMap()
+ .computeIfAbsent(tableName, tableSchemaFunction);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 36282e91840..7a753658c96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -138,7 +138,8 @@ public class WritableMemChunk implements IWritableMemChunk {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results) {
+ int end,
+ TSStatus[] results) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 867b2aa11dc..b8a868cecb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -52,7 +52,8 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end, TSStatus[] results) {
+ int end,
+ TSStatus[] results) {
boolean flushFlag = false;
for (int i = 0; i < columns.length; i++) {
if (columns[i] == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 69e8987e7f9..80f41592387 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -135,8 +135,8 @@ public class TriggerFireVisitor extends
PlanVisitor<TriggerFireResult, TriggerEv
}
@Override
- public TriggerFireResult
visitRelationalInsertTablet(RelationalInsertTabletNode node,
- TriggerEvent context) {
+ public TriggerFireResult visitRelationalInsertTablet(
+ RelationalInsertTabletNode node, TriggerEvent context) {
// TODO-Table: add support
return visitInsertTablet(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 7d179e7defc..60f00cd946b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -127,8 +127,8 @@ public class MemUtils {
return memSize;
}
- public static long getAlignedTabletSize(InsertTabletNode insertTabletNode,
int start, int end,
- TSStatus[] results) {
+ public static long getAlignedTabletSize(
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results) {
if (start >= end) {
return 0L;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 9990294b7bd..aa23d8fce00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -724,8 +724,8 @@ public abstract class AlignedTVList extends TVList {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
- public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end,
- TSStatus[] results) {
+ public void putAlignedValues(
+ long[] time, Object[] value, BitMap[] bitMaps, int start, int end,
TSStatus[] results) {
checkExpansion();
int idx = start;
@@ -856,22 +856,23 @@ public abstract class AlignedTVList extends TVList {
return TSDataType.VECTOR;
}
-
-
/**
* Get the single alignedTVList array mem cost by give types.
*
* @param types the types in the vector
* @return AlignedTvListArrayMemSize
*/
- public static long alignedTvListArrayMemCost(TSDataType[] types,
TsTableColumnCategory[] columnCategories) {
+ public static long alignedTvListArrayMemCost(
+ TSDataType[] types, TsTableColumnCategory[] columnCategories) {
int measurementColumnNum = 0;
long size = 0;
// value array mem size
for (int i = 0; i < types.length; i++) {
TSDataType type = types[i];
- if (type != null || columnCategories != null || columnCategories[i] ==
TsTableColumnCategory.MEASUREMENT) {
+ if (type != null
+ || columnCategories != null
+ || columnCategories[i] == TsTableColumnCategory.MEASUREMENT) {
size += (long) ARRAY_SIZE * (long) type.getDataTypeSize();
measurementColumnNum++;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index b2cd965458e..337cdd1e364 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -190,8 +190,8 @@ public abstract class TVList implements WALEntryValue {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
- public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end,
- TSStatus[] results) {
+ public void putAlignedValues(
+ long[] time, Object[] value, BitMap[] bitMaps, int start, int end,
TSStatus[] results) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
index da522be5e5c..ec152ba06bd 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.parser;
-import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -83,7 +82,6 @@ import org.apache.iotdb.session.template.MeasurementNode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.junit.Assert;
import org.junit.Test;
@@ -98,6 +96,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.db.schemaengine.template.TemplateQueryType.SHOW_MEASUREMENTS;
import static org.apache.tsfile.file.metadata.enums.CompressionType.SNAPPY;
@@ -205,8 +204,11 @@ public class StatementGeneratorTest {
public void testInsertRelationalTablet() throws IllegalPathException {
List<String> measurements = Arrays.asList("id1", "attr1", "m1");
List<TSDataType> dataTypes = Arrays.asList(TSDataType.TEXT,
TSDataType.TEXT, TSDataType.DOUBLE);
- List<TsTableColumnCategory> columnCategories =
Arrays.asList(TsTableColumnCategory.ID,
- TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.MEASUREMENT);
+ List<TsTableColumnCategory> columnCategories =
+ Arrays.asList(
+ TsTableColumnCategory.ID,
+ TsTableColumnCategory.ATTRIBUTE,
+ TsTableColumnCategory.MEASUREMENT);
TSInsertTabletReq req =
new TSInsertTabletReq(
101L,
@@ -216,7 +218,8 @@ public class StatementGeneratorTest {
ByteBuffer.wrap(new byte[128]),
dataTypes.stream().map(d -> (int)
d.serialize()).collect(Collectors.toList()),
1);
- req.setColumnCategories(columnCategories.stream().map(c -> (byte)
c.ordinal()).collect(Collectors.toList()));
+ req.setColumnCategories(
+ columnCategories.stream().map(c -> (byte)
c.ordinal()).collect(Collectors.toList()));
req.setWriteToTable(true);
final InsertTabletStatement statement =
StatementGenerator.createStatement(req);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 45a720c8511..2a6f64e6bb6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
@@ -35,6 +43,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -64,6 +74,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.common.type.Type;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -83,6 +94,7 @@ import static
org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
import static org.apache.tsfile.read.common.type.IntType.INT32;
import static org.apache.tsfile.read.common.type.LongType.INT64;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -339,7 +351,7 @@ public class AnalyzerTest {
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof
FilterNode);
@@ -355,7 +367,7 @@ public class AnalyzerTest {
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
tableScanNode = (TableScanNode)
rootNode.getChildren().get(0).getChildren().get(0);
@@ -372,7 +384,7 @@ public class AnalyzerTest {
actualAnalysis = analyzeSQL(sql, metadata);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
@@ -382,7 +394,7 @@ public class AnalyzerTest {
actualAnalysis = analyzeSQL(sql, metadata);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
@@ -393,7 +405,7 @@ public class AnalyzerTest {
actualAnalysis = analyzeSQL(sql, metadata);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
@@ -403,7 +415,7 @@ public class AnalyzerTest {
actualAnalysis = analyzeSQL(sql, metadata);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
}
@@ -472,7 +484,7 @@ public class AnalyzerTest {
actualAnalysis = analyzeSQL(sql, metadata);
logicalQueryPlan =
new LogicalPlanner(
- context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
rootNode = logicalQueryPlan.getRootNode();
assertTrue(rootNode instanceof OutputNode);
@@ -495,18 +507,59 @@ public class AnalyzerTest {
@Test
public void analyzeTablet() {
- TableSchema tableSchema =
+ TableSchema tableSchema = StatementTestUtils.genTableSchema();
Metadata mockMetadata = new TestMatadata() {
@Override
- public TableSchema validateTableHeaderSchema(String database,
TableSchema tableSchema,
+ public TableSchema validateTableHeaderSchema(String database,
TableSchema schema,
MPPQueryContext context) {
- return null;
+ assertEquals(tableSchema, schema);
+ return tableSchema;
}
@Override
public void validateDeviceSchema(ITableDeviceSchemaValidation
schemaValidation,
MPPQueryContext context) {
+ assertEquals(sessionInfo.getDatabaseName().get(),
schemaValidation.getDatabase());
+ assertEquals(StatementTestUtils.tableName(),
schemaValidation.getTableName());
+ Object[] columns = StatementTestUtils.genColumns();
+ for (int i = 0; i < schemaValidation.getDeviceIdList().size(); i++) {
+ Object[] objects = schemaValidation.getDeviceIdList().get(i);
+ assertEquals(objects[0].toString(), StatementTestUtils.tableName());
+ assertEquals(objects[1].toString(), ((String[]) columns[0])[i]);
+ }
+ List<String> attributeColumnNameList =
schemaValidation.getAttributeColumnNameList();
+ assertEquals(Collections.singletonList("attr1"),
attributeColumnNameList);
+ assertEquals(1, schemaValidation.getAttributeValueList().size());
+ assertArrayEquals((Object[]) columns[1],
schemaValidation.getAttributeValueList().get(0));
+ }
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams, String
userName) {
+ int seriesSlotNum = 1000;
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ assertEquals(3, dataPartitionQueryParams.size());
+
+ for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
+ String databaseName = dataPartitionQueryParam.getDatabaseName();
+ assertEquals(sessionInfo.getDatabaseName().get(), databaseName);
+
+ String tableName =
dataPartitionQueryParam.getDeviceID().getTableName();
+ assertEquals(StatementTestUtils.tableName(), tableName);
+
+ int partitionSlot = Math.abs(tableName.hashCode()) % seriesSlotNum;
+ TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(partitionSlot);
+ for (TTimePartitionSlot tTimePartitionSlot :
dataPartitionQueryParam.getTimePartitionSlotList()) {
+ dataPartitionMap.computeIfAbsent(databaseName, d -> new
HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, slot -> new HashMap<>())
+ .computeIfAbsent(tTimePartitionSlot, slot -> new ArrayList<>())
+ .add(new TRegionReplicaSet(new TConsensusGroupId(
+ TConsensusGroupType.DataRegion, partitionSlot),
Collections.singletonList(
+ new TDataNodeLocation(partitionSlot, null, null, null,
null, null))));
+ }
+ }
+ return new DataPartition(dataPartitionMap, "dummy", seriesSlotNum);
}
};
@@ -518,6 +571,21 @@ public class AnalyzerTest {
new LogicalPlanner(
context, mockMetadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
+
+ OutputNode node = (OutputNode) logicalQueryPlan.getRootNode();
+ assertEquals(1, node.getChildren().size());
+ RelationalInsertTabletNode insertTabletNode = (RelationalInsertTabletNode)
node.getChildren()
+ .get(0);
+
+ assertEquals(insertTabletNode.getTableName(),
StatementTestUtils.tableName());
+ assertEquals(3, insertTabletNode.getRowCount());
+ Object[] columns = StatementTestUtils.genColumns();
+ for (int i = 0; i < insertTabletNode.getRowCount(); i++) {
+ assertEquals(Factory.DEFAULT_FACTORY.create(new
String[]{StatementTestUtils.tableName(),
+ ((String[]) columns[0])[i]}), insertTabletNode.getDeviceID(i));
+ }
+ assertEquals(columns, insertTabletNode.getColumns());
+ assertArrayEquals(StatementTestUtils.genTimestamps(),
insertTabletNode.getTimes());
}
public static Analysis analyzeSQL(String sql, Metadata metadata) {
@@ -526,7 +594,8 @@ public class AnalyzerTest {
return analyzeStatement(statement, metadata, sqlParser);
}
- public static Analysis analyzeStatement(Statement statement, Metadata
metadata, SqlParser sqlParser) {
+ public static Analysis analyzeStatement(Statement statement, Metadata
metadata,
+ SqlParser sqlParser) {
try {
SessionInfo session =
new SessionInfo(
@@ -605,7 +674,8 @@ public class AnalyzerTest {
}
@Override
- public void invalidAllCache() {}
+ public void invalidAllCache() {
+ }
@Override
public SchemaPartition getOrCreateSchemaPartition(
@@ -625,5 +695,7 @@ public class AnalyzerTest {
};
}
- private static class NopAccessControl implements AccessControl {}
+ private static class NopAccessControl implements AccessControl {
+
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index ee4997bb8da..12abb28878e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -19,18 +19,21 @@
package org.apache.iotdb.db.queryengine.plan.statement;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.TypeFactory;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tsfile.utils.Binary;
+
public class StatementTestUtils {
+
private StatementTestUtils() {
// util class
}
@@ -48,8 +51,9 @@ public class StatementTestUtils {
}
public static TsTableColumnCategory[] genColumnCategories() {
- return new TsTableColumnCategory[]{TsTableColumnCategory.ID,
- TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.MEASUREMENT};
+ return new TsTableColumnCategory[]{
+ TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE,
TsTableColumnCategory.MEASUREMENT
+ };
}
public static List<ColumnSchema> genColumnSchema() {
@@ -59,8 +63,9 @@ public class StatementTestUtils {
List<ColumnSchema> result = new ArrayList<>();
for (int i = 0; i < columnNames.length; i++) {
- result.add(new ColumnSchema(columnNames[i],
TypeFactory.getType(dataTypes[i]), false,
- columnCategories[i]));
+ result.add(
+ new ColumnSchema(
+ columnNames[i], TypeFactory.getType(dataTypes[i]), false,
columnCategories[i]));
}
return result;
}
@@ -69,14 +74,25 @@ public class StatementTestUtils {
return new TableSchema(tableName(), genColumnSchema());
}
+ public static Object[] genColumns() {
+ return new Object[]{
+ new String[]{"a", "b", "c"},
+ new String[]{"x", "y", "z"},
+ new Double[]{1.0, 2.0, 3.0}
+ };
+ }
+
+ public static long[] genTimestamps() {
+ return new long[]{1L, 2L, 3L};
+ }
+
public static InsertTabletStatement genInsertTabletStatement(boolean
writeToTable) {
String[] measurements = genColumnNames();
TSDataType[] dataTypes = genDataTypes();
TsTableColumnCategory[] columnCategories = genColumnCategories();
- Object[] columns = new Object[]{new String[]{"a", "b", "c"}, new
String[]{"x", "y", "z"},
- new Double[]{1.0, 2.0, 3.0}};
- long[] timestamps = new long[]{1L, 2L, 3L};
+ Object[] columns = genColumns();
+ long[] timestamps = genTimestamps();
InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
insertTabletStatement.setDevicePath(new PartialPath(new
String[]{tableName()}));
@@ -86,6 +102,7 @@ public class StatementTestUtils {
insertTabletStatement.setColumns(columns);
insertTabletStatement.setTimes(timestamps);
insertTabletStatement.setWriteToTable(writeToTable);
+ insertTabletStatement.setRowCount(timestamps.length);
return insertTabletStatement;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 770d0ce05f3..ea4632452b1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -342,19 +342,23 @@ public class TsFileProcessorTest {
processor.insertTablet(genInsertTableNode(0, true), 0, 10, new
TSStatus[10], true);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(1596808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(0, true), 0, 10,
new TSStatus[10], true);
+ processor.insertTablet(
+ genInsertTableNodeFors3000ToS6000(0, true), 0, 10, new TSStatus[10],
true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
processor.insertTablet(genInsertTableNode(100, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(100, true), 0,
10, new TSStatus[10], true);
+ processor.insertTablet(
+ genInsertTableNodeFors3000ToS6000(100, true), 0, 10, new TSStatus[10],
true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
processor.insertTablet(genInsertTableNode(200, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(200, true), 0,
10, new TSStatus[10], true);
+ processor.insertTablet(
+ genInsertTableNodeFors3000ToS6000(200, true), 0, 10, new TSStatus[10],
true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
processor.insertTablet(genInsertTableNode(300, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(6385616, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(300, true), 0,
10, new TSStatus[10], true);
+ processor.insertTablet(
+ genInsertTableNodeFors3000ToS6000(300, true), 0, 10, new TSStatus[10],
true);
Assert.assertEquals(6385616, memTable.getTVListsRamCost());
Assert.assertEquals(240000, memTable.getTotalPointsNum());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
index 1b04f25d1a7..25e45af4ea7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
@@ -132,8 +132,7 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000,
- null);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000, null);
for (long i = 0; i < tvList.rowCount; i++) {
Assert.assertEquals(tvList.rowCount - i, tvList.getTime((int) i));
if (i % 100 == 0) {
@@ -165,8 +164,7 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000,
- null);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000, null);
AlignedTVList clonedTvList = tvList.clone();
for (long i = 0; i < tvList.rowCount; i++) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 4b78cd763da..dab8d05b42c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -27,6 +27,7 @@ import java.util.List;
public class DataPartitionQueryParam {
+ private String databaseName;
private IDeviceID deviceID;
private List<TTimePartitionSlot> timePartitionSlotList = new ArrayList<>();
@@ -88,4 +89,12 @@ public class DataPartitionQueryParam {
public void setNeedRightAll(boolean needRightAll) {
this.needRightAll = needRightAll;
}
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index b0794d27f8f..dd0d4662cc7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -156,5 +156,4 @@ public class TsTable {
public int hashCode() {
return Objects.hash(tableName);
}
-
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
index ab7ae9bf6d5..3cd29519949 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
@@ -20,12 +20,12 @@
package org.apache.iotdb.commons.schema.table.column;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.tsfile.write.record.Tablet.ColumnType;
public enum TsTableColumnCategory {
ID((byte) 0),
@@ -80,8 +80,8 @@ public enum TsTableColumnCategory {
switch (this) {
case ID:
return ColumnType.ID;
- case ATTRIBUTE:
- return ColumnType.ATTRIBUTE;
+ case ATTRIBUTE:
+ return ColumnType.ATTRIBUTE;
case MEASUREMENT:
return ColumnType.MEASUREMENT;
default: