This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 60644b2b6c6 Rename class name, add more UT case
60644b2b6c6 is described below
commit 60644b2b6c6ccbda769994d55c5b5d81f1e4725b
Author: Beyyes <[email protected]>
AuthorDate: Thu May 30 10:58:30 2024 +0800
Rename class name, add more UT case
---
.../org/apache/iotdb/session/it/SessionIT.java | 2 +-
.../plan/planner/TemplatedLogicalPlan.java | 14 +-
.../plan/relational/planner/LogicalPlanner.java | 8 +-
.../relational/planner/RelationalModelPlanner.java | 13 +-
...nPlanner.java => TableDistributionPlanner.java} | 4 +-
.../planner/optimizations/IndexScan.java | 53 +++----
.../optimizations/PruneTableScanColumns.java | 2 +
.../optimizations/RelationalPlanOptimizer.java | 2 +
.../RemoveRedundantIdentityProjections.java | 2 +
.../planner/optimizations/SimplifyExpressions.java | 2 +
.../plan/relational/analyzer/AnalyzerTest.java | 156 +++++++++++++++---
.../relational/analyzer/MockTablePartition.java | 174 +++++++++++++++++++++
.../plan/relational/analyzer/TestMatadata.java | 4 +-
13 files changed, 363 insertions(+), 73 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
index 7ec02a955be..05ee72f6076 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
@@ -269,7 +269,7 @@ public class SessionIT {
@Test
public void testInsertTablet() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
- List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
String deviceId = "root.db.d1";
schemaList.add(new MeasurementSchema("s1", TSDataType.DATE));
schemaList.add(new MeasurementSchema("s2", TSDataType.TIMESTAMP));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index b72c3a92142..a0a5678c1aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -132,15 +132,19 @@ public class TemplatedLogicalPlan {
.getExpressionTypes()
.forEach(
(key, value) ->
-
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
+ context
+ .getTypeProvider()
+ .setTreeModelType(key.getNode().getOutputSymbol(),
value));
}
if (queryStatement.isOutputEndTime()) {
- context.getTypeProvider().setType(END_TIME_EXPRESSION.getOutputSymbol(),
TSDataType.INT64);
+ context
+ .getTypeProvider()
+ .setTreeModelType(END_TIME_EXPRESSION.getOutputSymbol(),
TSDataType.INT64);
}
if (queryStatement.isCountTimeAggregation()) {
- context.getTypeProvider().setType("count_time(*)", TSDataType.INT64);
- context.getTypeProvider().setType("count_time(Time)", TSDataType.INT64);
+ context.getTypeProvider().setTreeModelType("count_time(*)",
TSDataType.INT64);
+ context.getTypeProvider().setTreeModelType("count_time(Time)",
TSDataType.INT64);
}
List<Integer> deviceToMeasurementIndexes =
@@ -454,7 +458,7 @@ public class TemplatedLogicalPlan {
&& !expression.getExpressionString().equals(ENDTIME)) {
context
.getTypeProvider()
- .setType(expression.getExpressionString(),
getPreAnalyzedType(expression));
+ .setTreeModelType(expression.getExpressionString(),
getPreAnalyzedType(expression));
}
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 6a6f4d25e52..fb593d4c852 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -20,6 +20,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -58,16 +59,19 @@ public class LogicalPlanner {
private final SymbolAllocator symbolAllocator = new SymbolAllocator();
private final List<RelationalPlanOptimizer> relationalPlanOptimizers;
private final Metadata metadata;
+ private final IPartitionFetcher partitionFetcher;
private final WarningCollector warningCollector;
public LogicalPlanner(
MPPQueryContext context,
Metadata metadata,
SessionInfo sessionInfo,
+ IPartitionFetcher partitionFetcher,
WarningCollector warningCollector) {
this.context = context;
this.metadata = metadata;
this.sessionInfo = requireNonNull(sessionInfo, "session is null");
+ this.partitionFetcher = requireNonNull(partitionFetcher, "partitionFetcher
is null");
this.warningCollector = requireNonNull(warningCollector, "warningCollector
is null");
this.relationalPlanOptimizers =
@@ -82,7 +86,9 @@ public class LogicalPlanner {
PlanNode planNode = planStatement(analysis, analysis.getStatement());
relationalPlanOptimizers.forEach(
- optimizer -> optimizer.optimize(planNode, analysis, metadata,
sessionInfo, context));
+ optimizer ->
+ optimizer.optimize(
+ planNode, analysis, metadata, partitionFetcher, sessionInfo,
context));
return new LogicalQueryPlan(context, planNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
index 6ae3e5c56ac..bfe66bdc921 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
@@ -36,7 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement;
@@ -108,7 +109,12 @@ public class RelationalModelPlanner implements IPlanner {
@Override
public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext
context) {
try {
- return new LogicalPlanner(context, metadata, context.getSession(),
warningCollector)
+ return new LogicalPlanner(
+ context,
+ metadata,
+ context.getSession(),
+ ClusterPartitionFetcher.getInstance(),
+ warningCollector)
.plan((Analysis) analysis);
} catch (IoTDBException e) {
throw new RuntimeException(e);
@@ -117,8 +123,7 @@ public class RelationalModelPlanner implements IPlanner {
@Override
public DistributedQueryPlan doDistributionPlan(IAnalysis analysis,
LogicalQueryPlan logicalPlan) {
- return new RelationalDistributionPlanner(
- (Analysis) analysis, logicalPlan, logicalPlan.getContext())
+ return new TableDistributionPlanner((Analysis) analysis, logicalPlan,
logicalPlan.getContext())
.plan();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 438a0ffb569..5511209a3f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -36,12 +36,12 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
-public class RelationalDistributionPlanner {
+public class TableDistributionPlanner {
private final Analysis analysis;
private final LogicalQueryPlan logicalQueryPlan;
private final MPPQueryContext mppQueryContext;
- public RelationalDistributionPlanner(
+ public TableDistributionPlanner(
Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext
mppQueryContext) {
this.analysis = analysis;
this.logicalQueryPlan = logicalQueryPlan;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index 711ad504a1c..56eb8ea082d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -14,8 +14,6 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -25,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
-import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -56,7 +53,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
-/** Extract IDeviceID and */
+/** Extract IDeviceID */
public class IndexScan implements RelationalPlanOptimizer {
static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
@@ -68,11 +65,13 @@ public class IndexScan implements RelationalPlanOptimizer {
PlanNode planNode,
Analysis analysis,
Metadata metadata,
+ IPartitionFetcher partitionFetcher,
SessionInfo sessionInfo,
MPPQueryContext context) {
return planNode.accept(
- new Rewriter(), new RewriterContext(null, metadata, sessionInfo,
analysis));
+ new Rewriter(),
+ new RewriterContext(null, metadata, sessionInfo, analysis,
partitionFetcher));
}
private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext>
{
@@ -100,21 +99,18 @@ public class IndexScan implements RelationalPlanOptimizer {
.collect(Collectors.toList());
List<Expression> conjExpressions =
getConjunctionExpressions(context.getPredicate(), node);
-
+ String dbName = context.getSessionInfo().getDatabaseName().get();
List<DeviceEntry> deviceEntries =
context
.getMetadata()
.indexScan(
- new QualifiedObjectName(
- context.getSessionInfo().getDatabaseName().get(),
- node.getQualifiedTableName()),
+ new QualifiedObjectName(dbName,
node.getQualifiedTableName()),
conjExpressions,
attributeColumns);
node.setDeviceEntries(deviceEntries);
// TODO getDataPartition, Change globalTimeFilter to Filter
- String database = "root." +
context.getSessionInfo().getDatabaseName().get();
- IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
+ String treeDatabase = "root." + dbName;
Filter globalTimeFilter = null;
Set<String> deviceSet = new HashSet<>();
for (DeviceEntry deviceEntry : deviceEntries) {
@@ -124,18 +120,18 @@ public class IndexScan implements RelationalPlanOptimizer
{
}
DataPartition dataPartition =
- fetchDataPartitionByDevices(deviceSet, database, globalTimeFilter,
partitionFetcher);
+ fetchDataPartitionByDevices(
+ deviceSet, treeDatabase, globalTimeFilter,
context.partitionFetcher);
context.getAnalysis().setDataPartition(dataPartition);
if (dataPartition.getDataPartitionMap().size() > 1) {
throw new IllegalStateException(
- "Table model can only process data only in one data region yet!");
+ "Table model can only process data only in one database yet!");
}
if (dataPartition.getDataPartitionMap().isEmpty()) {
context.getAnalysis().setFinishQueryAfterAnalyze();
} else {
- // TODO add the real impl
Set<TRegionReplicaSet> regionReplicaSet = new HashSet<>();
for (Map.Entry<
String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
@@ -193,6 +189,7 @@ public class IndexScan implements RelationalPlanOptimizer {
IPartitionFetcher partitionFetcher) {
Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
getTimePartitionSlotList(globalTimeFilter);
+
// there is no satisfied time range
if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
return new DataPartition(
@@ -200,6 +197,7 @@ public class IndexScan implements RelationalPlanOptimizer {
CONFIG.getSeriesPartitionExecutorClass(),
CONFIG.getSeriesPartitionSlotNum());
}
+
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
for (String devicePath : deviceSet) {
DataPartitionQueryParam queryParam =
@@ -207,15 +205,6 @@ public class IndexScan implements RelationalPlanOptimizer {
sgNameToQueryParamsMap.computeIfAbsent(database, key -> new
ArrayList<>()).add(queryParam);
}
- // Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
- // dataPartitionMap = new HashMap<>();
- // dataPartitionMap.put("root.db", Collections.singletonMap(new
TSeriesPartitionSlot(1),
- // Collections.singletonMap(new TTimePartitionSlot(1),
- // Collections.singletonList(new TRegionReplicaSet(
- // new
TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
- // Arrays.asList(genDataNodeLocation(1,
"127.0.0.1")))))));
- // return new DataPartition(dataPartitionMap, "hkb", 1);
-
if (res.right.left || res.right.right) {
return
partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
} else {
@@ -223,26 +212,24 @@ public class IndexScan implements RelationalPlanOptimizer
{
}
}
- private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String
ip) {
- return new TDataNodeLocation()
- .setDataNodeId(dataNodeId)
- .setClientRpcEndPoint(new TEndPoint(ip, 9000))
- .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
- .setInternalEndPoint(new TEndPoint(ip, 9002));
- }
-
private static class RewriterContext {
private Expression predicate;
private Metadata metadata;
private final SessionInfo sessionInfo;
- private Analysis analysis;
+ private final Analysis analysis;
+ private final IPartitionFetcher partitionFetcher;
RewriterContext(
- Expression predicate, Metadata metadata, SessionInfo sessionInfo,
Analysis analysis) {
+ Expression predicate,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ Analysis analysis,
+ IPartitionFetcher partitionFetcher) {
this.predicate = predicate;
this.metadata = metadata;
this.sessionInfo = sessionInfo;
this.analysis = analysis;
+ this.partitionFetcher = partitionFetcher;
}
public Expression getPredicate() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
index 32ff068e5c9..cbc18a51465 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
@@ -16,6 +16,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -39,6 +40,7 @@ public class PruneTableScanColumns implements
RelationalPlanOptimizer {
PlanNode planNode,
Analysis analysis,
Metadata metadata,
+ IPartitionFetcher partitionFetcher,
SessionInfo sessionInfo,
MPPQueryContext context) {
return planNode.accept(new Rewriter(), new RewriterContext());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
index 37611e6a35c..0e96f27be60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
@@ -16,6 +16,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -25,6 +26,7 @@ public interface RelationalPlanOptimizer {
PlanNode planNode,
Analysis analysis,
Metadata metadata,
+ IPartitionFetcher partitionFetcher,
SessionInfo sessionInfo,
MPPQueryContext context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
index 67027079a17..28b5ab8d1d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
@@ -16,6 +16,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
@@ -34,6 +35,7 @@ public class RemoveRedundantIdentityProjections implements
RelationalPlanOptimiz
PlanNode planNode,
Analysis analysis,
Metadata metadata,
+ IPartitionFetcher partitionFetcher,
SessionInfo sessionInfo,
MPPQueryContext context) {
return planNode.accept(new Rewriter(), new RewriterContext());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
index 5e0bf7e4dce..8abd6273411 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
@@ -16,6 +16,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -34,6 +35,7 @@ public class SimplifyExpressions implements
RelationalPlanOptimizer {
PlanNode planNode,
Analysis analysis,
Metadata metadata,
+ IPartitionFetcher partitionFetcher,
SessionInfo sessionInfo,
MPPQueryContext context) {
// TODO add query statement pruning
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index d03abfb56e2..104cdc9ed48 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -21,13 +21,20 @@ package
org.apache.iotdb.db.queryengine.plan.relational.analyzer;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -37,12 +44,14 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
@@ -55,11 +64,14 @@ import java.util.Map;
import java.util.Optional;
import static
org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP;
+import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
import static org.apache.tsfile.read.common.type.IntType.INT32;
import static org.apache.tsfile.read.common.type.LongType.INT64;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
@@ -133,38 +145,71 @@ public class AnalyzerTest {
System.out.println(actualAnalysis.getTypes());
}
- @Ignore
- @Test
- public void testSingleTableQuery() throws IoTDBException {
- // no sort
- String sql = "SELECT tag1, s1 FROM table1";
- // + "WHERE time>1 AND tag1='A' OR s2>3";
- Metadata metadata = new TestMatadata();
+ QueryId queryId = new QueryId("tmp_query");
+ SessionInfo sessionInfo =
+ new SessionInfo(
+ 1L,
+ "iotdb-user",
+ ZoneId.systemDefault(),
+ IoTDBConstant.ClientVersion.V_1_0,
+ "db",
+ IClientSession.SqlDialect.TABLE);
+ Metadata metadata = new TestMatadata();
+ @Test
+ public void singleTableNoFilterTest() throws IoTDBException {
+ // 1. wildcard
+ String sql = "SELECT * FROM table1";
Analysis actualAnalysis = analyzeSQL(sql, metadata);
assertNotNull(actualAnalysis);
- System.out.println(actualAnalysis.getTypes());
+ assertEquals(1, actualAnalysis.getTables().size());
- QueryId queryId = new QueryId("tmp_query");
- MPPQueryContext context = new MPPQueryContext(queryId);
- SessionInfo sessionInfo =
- new SessionInfo(
- 1L,
- "iotdb-user",
- ZoneId.systemDefault(),
- IoTDBConstant.ClientVersion.V_1_0,
- "db",
- IClientSession.SqlDialect.TABLE);
- WarningCollector warningCollector = WarningCollector.NOOP;
+ MPPQueryContext context = new MPPQueryContext(sql, queryId, sessionInfo,
null, null);
LogicalPlanner logicalPlanner =
- new LogicalPlanner(context, metadata, sessionInfo, warningCollector);
+ new LogicalPlanner(
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP);
LogicalQueryPlan logicalQueryPlan = logicalPlanner.plan(actualAnalysis);
- System.out.println(logicalQueryPlan);
+ PlanNode rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(rootNode instanceof OutputNode);
+ assertTrue(((OutputNode) rootNode).getChild() instanceof TableScanNode);
+ TableScanNode tableScanNode = (TableScanNode) ((OutputNode)
rootNode).getChild();
+ assertEquals("table1", tableScanNode.getQualifiedTableName());
+ assertEquals(8, tableScanNode.getOutputSymbols().size());
+ assertEquals(8, tableScanNode.getAssignments().size());
+ assertEquals(1, tableScanNode.getDeviceEntries().size());
+ assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
- RelationalDistributionPlanner distributionPlanner =
- new RelationalDistributionPlanner(actualAnalysis, logicalQueryPlan,
context);
+ TableDistributionPlanner distributionPlanner =
+ new TableDistributionPlanner(actualAnalysis, logicalQueryPlan,
context);
DistributedQueryPlan distributedQueryPlan = distributionPlanner.plan();
- System.out.println(distributedQueryPlan);
+ assertEquals(4, distributedQueryPlan.getInstances().size());
+
+ // 2. global time filter
+ sql = "SELECT * FROM table1 where time > 1";
+ actualAnalysis = analyzeSQL(sql, metadata);
+ assertNotNull(actualAnalysis);
+ assertEquals(1, actualAnalysis.getTables().size());
+
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ logicalPlanner =
+ new LogicalPlanner(
+ context, metadata, sessionInfo, getFakePartitionFetcher(),
WarningCollector.NOOP);
+ logicalQueryPlan = logicalPlanner.plan(actualAnalysis);
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(rootNode instanceof OutputNode);
+ assertTrue(((OutputNode) rootNode).getChild() instanceof TableScanNode);
+ tableScanNode = (TableScanNode) ((OutputNode) rootNode).getChild();
+ assertEquals("table1", tableScanNode.getQualifiedTableName());
+ assertEquals(8, tableScanNode.getOutputSymbols().size());
+ assertEquals(8, tableScanNode.getAssignments().size());
+ assertEquals(1, tableScanNode.getDeviceEntries().size());
+ assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+ assertEquals(ASC, tableScanNode.getScanOrder());
+
+ distributionPlanner = new TableDistributionPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(4, distributedQueryPlan.getInstances().size());
}
public static Analysis analyzeSQL(String sql, Metadata metadata) {
@@ -193,5 +238,64 @@ public class AnalyzerTest {
return null;
}
+ private static final DataPartition DATA_PARTITION =
MockTablePartition.constructDataPartition();
+ private static final SchemaPartition SCHEMA_PARTITION =
+ MockTablePartition.constructSchemaPartition();
+
+ private static IPartitionFetcher getFakePartitionFetcher() {
+
+ return new IPartitionFetcher() {
+
+ @Override
+ public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+ return SCHEMA_PARTITION;
+ }
+
+ @Override
+ public SchemaPartition getOrCreateSchemaPartition(
+ PathPatternTree patternTree, String userName) {
+ return SCHEMA_PARTITION;
+ }
+
+ @Override
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return DATA_PARTITION;
+ }
+
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return DATA_PARTITION;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return DATA_PARTITION;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams, String
userName) {
+ return DATA_PARTITION;
+ }
+
+ @Override
+ public SchemaNodeManagementPartition
getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, PathPatternTree scope, Integer level) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return false;
+ }
+
+ @Override
+ public void invalidAllCache() {}
+ };
+ }
+
private static class NopAccessControl implements AccessControl {}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
new file mode 100644
index 00000000000..ae24f4a219e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockTablePartition {
+
+ private static final SeriesPartitionExecutor EXECUTOR =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+ private static final String DB_NAME = "root.testdb";
+
+ private static final String device1 = "root.testdb.d1";
+ private static final String device2 = "root.testdb.d22";
+ private static final String device3 = "root.testdb.d333";
+ private static final String device4 = "root.testdb.d4444";
+ private static final String device5 = "root.testdb.d55555";
+ private static final String device6 = "root.testdb.d666666";
+
+ public static DataPartition constructDataPartition() {
+ TRegionReplicaSet dataRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12,
"192.0.1.2")));
+
+ TRegionReplicaSet dataRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22,
"192.0.2.2")));
+
+ TRegionReplicaSet dataRegion3 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+ Arrays.asList(
+ genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32,
"192.0.3.2")));
+
+ TRegionReplicaSet dataRegion4 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+ Arrays.asList(
+ genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42,
"192.0.4.2")));
+
+ DataPartition dataPartition =
+ new DataPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+
+ List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(dataRegion1);
+ d1DataRegions.add(dataRegion2);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
+ d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+ List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(dataRegion3);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
+ d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+ List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ d3DataRegions.add(dataRegion1);
+ d3DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new
HashMap<>();
+ d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+
+ List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+ d4DataRegions.add(dataRegion1);
+ d4DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new
HashMap<>();
+ d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
+ List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
+ d5DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new
HashMap<>();
+ d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
+
+ List<TRegionReplicaSet> d6DataRegions = new ArrayList<>();
+ d6DataRegions.add(dataRegion1);
+ d6DataRegions.add(dataRegion2);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new
HashMap<>();
+ d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions);
+
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device1),
d1DataRegionMap);
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device2),
d2DataRegionMap);
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device3),
d3DataRegionMap);
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device4),
d4DataRegionMap);
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device5),
d5DataRegionMap);
+ sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device6),
d6DataRegionMap);
+
+ dataPartitionMap.put(DB_NAME, sgPartitionMap);
+ dataPartition.setDataPartitionMap(dataPartitionMap);
+
+ return dataPartition;
+ }
+
+ public static SchemaPartition constructSchemaPartition() {
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
schemaPartitionMap = new HashMap<>();
+
+ TRegionReplicaSet schemaRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12,
"192.0.1.2")));
+
+ TRegionReplicaSet schemaRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22,
"192.0.2.2")));
+
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new
HashMap<>();
+ schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device1),
schemaRegion1);
+ schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device2),
schemaRegion2);
+ schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device3),
schemaRegion2);
+ schemaPartitionMap.put(DB_NAME, schemaRegionMap);
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ return schemaPartition;
+ }
+
+ private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String
ip) {
+ return new TDataNodeLocation()
+ .setDataNodeId(dataNodeId)
+ .setClientRpcEndPoint(new TEndPoint(ip, 9000))
+ .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
+ .setInternalEndPoint(new TEndPoint(ip, 9002));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index ff8c8e7788e..bbb6ef02d3c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -283,7 +283,9 @@ public class TestMatadata implements Metadata {
List<Expression> expressionList,
List<String> attributeColumns) {
return Arrays.asList(
- new DeviceEntry(new StringArrayDeviceID("t1", "t2", "t3"),
Arrays.asList("a1", "a2")));
+ new DeviceEntry(
+ new StringArrayDeviceID("root.testdb", "table1", "t1", "t2", "t3"),
+ Arrays.asList("a1", "a2")));
}
public static boolean isTwoNumericType(List<? extends Type> argumentTypes) {