This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new e6ce64880b9 add distributed plan test
e6ce64880b9 is described below
commit e6ce64880b90fb030a373cfb1d800c1fc264b550
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Fri Jun 28 11:58:43 2024 +0800
add distributed plan test
---
.../db/queryengine/plan/analyze/Analysis.java | 12 +++
.../queryengine/plan/analyze/AnalyzeVisitor.java | 3 +
.../db/queryengine/plan/analyze/Analyzer.java | 1 +
.../db/queryengine/plan/analyze/IAnalysis.java | 4 +
.../config/executor/ClusterConfigTaskExecutor.java | 1 +
.../distribution/WriteFragmentParallelPlanner.java | 2 +-
.../planner/plan/node/write/InsertTabletNode.java | 85 ++++++++++++++++------
.../plan/relational/analyzer/Analysis.java | 20 ++++-
.../plan/relational/analyzer/Analyzer.java | 1 +
.../relational/analyzer/ExpressionAnalyzer.java | 1 +
.../plan/relational/planner/LogicalPlanner.java | 4 +
.../plan/relational/analyzer/AnalyzerTest.java | 38 ++++++----
.../plan/statement/StatementTestUtils.java | 4 +-
.../iotdb/commons/partition/DataPartition.java | 5 +-
14 files changed, 135 insertions(+), 46 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index e7fdd26fde8..294caa6b99f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -82,6 +82,8 @@ public class Analysis implements IAnalysis {
// Common Analysis
/////////////////////////////////////////////////////////////////////////////////////////////////
+ private String databaseName;
+
// Statement
private Statement statement;
@@ -988,4 +990,14 @@ public class Analysis implements IAnalysis {
public boolean fromWhere(FilterNode filterNode) {
return fromWhereFilterNodes.contains(NodeRef.of(filterNode));
}
+
+ @Override
+ public void setDatabaseName(String database) {
+ this.databaseName = database;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 365020f52a7..e0e68b99368 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -247,6 +247,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis = visitQuery(explainStatement.getQueryStatement(),
context);
analysis.setRealStatement(explainStatement);
analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setDatabaseName(context.getSession().getDatabaseName().get());
return analysis;
}
@@ -261,6 +262,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Collections.singletonList(
new ColumnHeader(ColumnHeaderConstant.EXPLAIN_ANALYZE,
TSDataType.TEXT, null)),
true));
+ analysis.setDatabaseName(context.getSession().getDatabaseName().get());
return analysis;
}
@@ -2790,6 +2792,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitPipeEnrichedStatement(
PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this,
context);
+ analysis.setDatabaseName(context.getSession().getDatabaseName().get());
// statement may be changed because of logical view
pipeEnrichedStatement.setInnerStatement(analysis.getTreeStatement());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index f7ca04f7d05..ed3b686b977 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -45,6 +45,7 @@ public class Analyzer {
long startTime = System.nanoTime();
Analysis analysis =
new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement,
context);
+ analysis.setDatabaseName(context.getSession().getDatabaseName().get());
if (statement.isQuery()) {
QueryPlanCostMetricSet.getInstance().recordPlanCost(ANALYZER,
System.nanoTime() - startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index ce9f043c9bb..05c3638a95d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -71,4 +71,8 @@ public interface IAnalysis {
void addEndPointToRedirectNodeList(TEndPoint endPoint);
TimePredicate getCovertedTimePredicate();
+
+ void setDatabaseName(String databaseName);
+
+ String getDatabaseName();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index ef900fddafe..aaa3eb01bd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2272,6 +2272,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
createLogicalViewStatement.setQueryStatement(alterLogicalViewStatement.getQueryStatement());
final Analysis analysis = Analyzer.analyze(createLogicalViewStatement,
context);
+ analysis.setDatabaseName(context.getSession().getDatabaseName().get());
if (analysis.isFailed()) {
future.setException(
new IoTDBException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 975e0c1736e..3f44175f404 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -64,7 +64,7 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
PlanFragment fragment = subPlan.getPlanFragment();
PlanNode node = fragment.getPlanNodeTree();
if (!(node instanceof WritePlanNode)) {
- throw new IllegalArgumentException("PlanNode should be IWritePlanNode in
WRITE operation");
+ throw new IllegalArgumentException("PlanNode should be IWritePlanNode in
WRITE operation:" + node.getClass());
}
List<WritePlanNode> splits = nodeSplitter.apply(((WritePlanNode) node),
analysis);
List<FragmentInstance> ret = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 87d3bac372e..46289ce6d0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -176,7 +177,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ }
@Override
public PlanNodeType getType() {
@@ -258,7 +260,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
final List<TRegionReplicaSet> replicaSets =
analysis
.getDataPartitionInfo()
- .getDataRegionReplicaSetForWriting(deviceID,
splitInfo.timePartitionSlots);
+ .getDataRegionReplicaSetForWriting(deviceID,
splitInfo.timePartitionSlots,
+ analysis.getDatabaseName());
splitInfo.replicaSets = replicaSets;
// collect redirectInfo
analysis.addEndPointToRedirectNodeList(
@@ -374,9 +377,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
switch (dataTypes[i]) {
case TEXT:
case BLOB:
- case STRING:
values[i] = new Binary[rowSize];
break;
+ case STRING:
+ values[i] = new String[rowSize];
+ break;
case FLOAT:
values[i] = new float[rowSize];
break;
@@ -454,7 +459,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
}
- /** Serialize measurements or measurement schemas, ignoring failed time
series */
+ /**
+ * Serialize measurements or measurement schemas, ignoring failed time series
+ */
private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(),
buffer);
ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0),
buffer);
@@ -473,7 +480,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize measurements or measurement schemas, ignoring failed time
series */
+ /**
+ * Serialize measurements or measurement schemas, ignoring failed time series
+ */
private void writeMeasurementsOrSchemas(DataOutputStream stream) throws
IOException {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(),
stream);
ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0),
stream);
@@ -492,7 +501,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize data types, ignoring failed time series */
+ /**
+ * Serialize data types, ignoring failed time series
+ */
private void writeDataTypes(ByteBuffer buffer) {
for (int i = 0; i < dataTypes.length; i++) {
// ignore failed partial insert
@@ -503,7 +514,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize data types, ignoring failed time series */
+ /**
+ * Serialize data types, ignoring failed time series
+ */
private void writeDataTypes(DataOutputStream stream) throws IOException {
for (int i = 0; i < dataTypes.length; i++) {
// ignore failed partial insert
@@ -528,7 +541,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(ByteBuffer buffer) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer);
if (bitMaps != null) {
@@ -548,7 +563,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
if (bitMaps != null) {
@@ -568,7 +585,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(ByteBuffer buffer) {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -579,7 +598,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(DataOutputStream stream) throws IOException {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -737,13 +758,18 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
// region serialize & deserialize methods for WAL
- /** Serialized size for wal */
+
+ /**
+ * Serialized size for wal
+ */
@Override
public int serializedSize() {
return serializedSize(0, rowCount);
}
- /** Serialized size for wal */
+ /**
+ * Serialized size for wal
+ */
public int serializedSize(int start, int end) {
return Short.BYTES + subSerializeSize(start, end);
}
@@ -846,7 +872,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
buffer.put((byte) (isAligned ? 1 : 0));
}
- /** Serialize measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurement schemas, ignoring failed time series
+ */
private void writeMeasurementSchemas(IWALByteBufferView buffer) {
buffer.putInt(measurements.length - getFailedMeasurementNumber());
serializeMeasurementSchemasToWAL(buffer);
@@ -859,7 +887,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
@@ -882,7 +912,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(IWALByteBufferView buffer, int start, int end) {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -942,7 +974,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- /** Deserialize from wal */
+ /**
+ * Deserialize from wal
+ */
public static InsertTabletNode deserializeFromWAL(DataInputStream stream)
throws IOException {
// we do not store plan node id in wal entry
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
@@ -1034,9 +1068,15 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
InsertTabletNode that = (InsertTabletNode) o;
return rowCount == that.rowCount
&& Arrays.equals(times, that.times)
@@ -1170,6 +1210,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
private static class PartitionSplitInfo {
+
// for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
private List<Integer> ranges = new ArrayList<>();
private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
@@ -1180,7 +1221,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
* Split the tablet of the given range according to Table deviceID.
*
* @param start inclusive
- * @param end exclusive
+ * @param end exclusive
* @return each the number in the pair is the end offset of a device
*/
public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
@@ -1188,7 +1229,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
/**
- * @param results insertion result of each row
+ * @param results insertion result of each row
* @param rowTTLGetter the ttl associated with each row
* @return the position of the first alive row
* @throws OutOfTTLException if all rows have expired the TTL
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 525bd8251b0..6bb868ec0ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -91,6 +91,9 @@ import static java.util.Objects.requireNonNull;
public class Analysis implements IAnalysis {
+ private String databaseName;
+ private List<TEndPoint> redirectNodeList;
+
@Nullable private Statement root;
private final Map<NodeRef<Parameter>, Expression> parameters;
@@ -687,12 +690,15 @@ public class Analysis implements IAnalysis {
@Override
public void setRedirectNodeList(List<TEndPoint> redirectNodeList) {
- throw new UnsupportedOperationException();
+ this.redirectNodeList = redirectNodeList;
}
@Override
public void addEndPointToRedirectNodeList(TEndPoint endPoint) {
- throw new UnsupportedOperationException();
+ if (redirectNodeList == null) {
+ redirectNodeList = new ArrayList<>();
+ }
+ redirectNodeList.add(endPoint);
}
@Override
@@ -950,4 +956,14 @@ public class Analysis implements IAnalysis {
return unmodifiableList(quantifiedComparisonSubqueries);
}
}
+
+ @Override
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
index 91f625a7fe9..88f235902e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
@@ -57,6 +57,7 @@ public class Analyzer {
public Analysis analyze(Statement statement) {
Analysis analysis = new Analysis(statement, parameterLookup);
+ analysis.setDatabaseName(session.getDatabaseName().get());
StatementAnalyzer analyzer =
statementAnalyzerFactory.createStatementAnalyzer(
analysis, session, warningCollector, CorrelationSupport.ALLOWED);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
index f5969d4e40d..96df7aee5e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
@@ -1328,6 +1328,7 @@ public class ExpressionAnalyzer {
Map<NodeRef<Parameter>, Expression> parameters,
WarningCollector warningCollector) {
Analysis analysis = new Analysis(null, parameters);
+ analysis.setDatabaseName(session.getDatabaseName().get());
ExpressionAnalyzer analyzer =
new ExpressionAnalyzer(
metadata,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 6d582fdcc9f..f1dcea2f310 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
@@ -122,6 +123,9 @@ public class LogicalPlanner {
}
private PlanNode createOutputPlan(RelationPlan plan, Analysis analysis) {
+ if (plan.getRoot() instanceof WritePlanNode) {
+ return plan.getRoot();
+ }
ImmutableList.Builder<Symbol> outputs = ImmutableList.builder();
ImmutableList.Builder<String> names = ImmutableList.builder();
List<ColumnHeader> columnHeaders = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 2a6f64e6bb6..3c6c642ca44 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -537,6 +538,10 @@ public class AnalyzerTest {
public DataPartition getOrCreateDataPartition(
List<DataPartitionQueryParam> dataPartitionQueryParams, String
userName) {
int seriesSlotNum = 1000;
+ String partitionExecutorName =
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+ SeriesPartitionExecutor seriesPartitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ partitionExecutorName, seriesSlotNum);
+
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
dataPartitionMap = new HashMap<>();
assertEquals(3, dataPartitionQueryParams.size());
@@ -548,34 +553,31 @@ public class AnalyzerTest {
String tableName =
dataPartitionQueryParam.getDeviceID().getTableName();
assertEquals(StatementTestUtils.tableName(), tableName);
- int partitionSlot = Math.abs(tableName.hashCode()) % seriesSlotNum;
- TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(partitionSlot);
+ TSeriesPartitionSlot partitionSlot =
seriesPartitionExecutor.getSeriesPartitionSlot(
+ dataPartitionQueryParam.getDeviceID());
for (TTimePartitionSlot tTimePartitionSlot :
dataPartitionQueryParam.getTimePartitionSlotList()) {
dataPartitionMap.computeIfAbsent(databaseName, d -> new
HashMap<>())
- .computeIfAbsent(seriesPartitionSlot, slot -> new HashMap<>())
+ .computeIfAbsent(partitionSlot, slot -> new HashMap<>())
.computeIfAbsent(tTimePartitionSlot, slot -> new ArrayList<>())
.add(new TRegionReplicaSet(new TConsensusGroupId(
- TConsensusGroupType.DataRegion, partitionSlot),
Collections.singletonList(
- new TDataNodeLocation(partitionSlot, null, null, null,
null, null))));
+ TConsensusGroupType.DataRegion, partitionSlot.slotId),
Collections.singletonList(
+ new TDataNodeLocation(partitionSlot.slotId, null, null,
null, null, null))));
}
}
- return new DataPartition(dataPartitionMap, "dummy", seriesSlotNum);
+ return new DataPartition(dataPartitionMap, partitionExecutorName,
seriesSlotNum);
}
};
InsertTabletStatement insertTabletStatement =
StatementTestUtils.genInsertTabletStatement(true);
context = new MPPQueryContext("", queryId, sessionInfo, null, null);
actualAnalysis =
analyzeStatement(insertTabletStatement.toRelationalStatement(context),
- mockMetadata, new SqlParser());
+ mockMetadata, new SqlParser(), sessionInfo);
logicalQueryPlan =
new LogicalPlanner(
context, mockMetadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP)
.plan(actualAnalysis);
- OutputNode node = (OutputNode) logicalQueryPlan.getRootNode();
- assertEquals(1, node.getChildren().size());
- RelationalInsertTabletNode insertTabletNode = (RelationalInsertTabletNode)
node.getChildren()
- .get(0);
+ RelationalInsertTabletNode insertTabletNode = (RelationalInsertTabletNode)
logicalQueryPlan.getRootNode();
assertEquals(insertTabletNode.getTableName(),
StatementTestUtils.tableName());
assertEquals(3, insertTabletNode.getRowCount());
@@ -586,20 +588,24 @@ public class AnalyzerTest {
}
assertEquals(columns, insertTabletNode.getColumns());
assertArrayEquals(StatementTestUtils.genTimestamps(),
insertTabletNode.getTimes());
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getInstances().size());
}
public static Analysis analyzeSQL(String sql, Metadata metadata) {
SqlParser sqlParser = new SqlParser();
Statement statement = sqlParser.createStatement(sql,
ZoneId.systemDefault());
- return analyzeStatement(statement, metadata, sqlParser);
+ SessionInfo session =
+ new SessionInfo(
+ 0, "test", ZoneId.systemDefault(), "testdb",
IClientSession.SqlDialect.TABLE);
+ return analyzeStatement(statement, metadata, sqlParser, session);
}
public static Analysis analyzeStatement(Statement statement, Metadata
metadata,
- SqlParser sqlParser) {
+ SqlParser sqlParser, SessionInfo session) {
try {
- SessionInfo session =
- new SessionInfo(
- 0, "test", ZoneId.systemDefault(), "testdb",
IClientSession.SqlDialect.TABLE);
StatementAnalyzerFactory statementAnalyzerFactory =
new StatementAnalyzerFactory(metadata, sqlParser, nopAccessControl);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 12abb28878e..008144db4d9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -47,7 +47,7 @@ public class StatementTestUtils {
}
public static TSDataType[] genDataTypes() {
- return new TSDataType[]{TSDataType.TEXT, TSDataType.TEXT,
TSDataType.DOUBLE};
+ return new TSDataType[]{TSDataType.STRING, TSDataType.STRING,
TSDataType.DOUBLE};
}
public static TsTableColumnCategory[] genColumnCategories() {
@@ -78,7 +78,7 @@ public class StatementTestUtils {
return new Object[]{
new String[]{"a", "b", "c"},
new String[]{"x", "y", "z"},
- new Double[]{1.0, 2.0, 3.0}
+ new double[]{1.0, 2.0, 3.0}
};
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 926dca39697..a84692aa8e5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -155,17 +155,16 @@ public class DataPartition extends Partition {
}
public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
- IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList) {
+ IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList,
String databaseName) {
// A list of data region replica sets will store data in a same time
partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- String storageGroup = getStorageGroupByDevice(deviceID);
TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceID);
// IMPORTANT TODO: (xingtanzjr) need to handle the situation for write
operation that there are
// more than 1 Regions for one timeSlot
List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
- dataBasePartitionMap = dataPartitionMap.get(storageGroup);
+ dataBasePartitionMap = dataPartitionMap.get(databaseName);
Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap =
dataBasePartitionMap.get(seriesPartitionSlot);
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {