This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 265da838f88 fix TsTableColumnCategory, add partition TRegionReplicaSet
impl
265da838f88 is described below
commit 265da838f880cd9bb3c085950eb6071508f6bb53
Author: Beyyes <[email protected]>
AuthorDate: Thu Apr 18 15:15:30 2024 +0800
fix TsTableColumnCategory, add partition TRegionReplicaSet impl
---
.../plan/relational/analyzer/Analysis.java | 21 +++++
.../plan/relational/planner/QueryPlanner.java | 4 +-
.../plan/relational/planner/RelationPlanner.java | 2 +
.../distribute/FragmentInstanceGenerator.java | 10 +--
.../relational/planner/node/TableScanNode.java | 4 +
.../planner/optimizations/IndexScan.java | 95 +++++++++++++++++++++-
.../plan/relational/analyzer/AnalyzerTest.java | 12 ++-
.../plan/relational/analyzer/TestMatadata.java | 25 ++++--
8 files changed, 154 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index e2d603fe479..d2a69511628 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -147,6 +148,26 @@ public class Analysis implements IAnalysis {
private final Set<NodeRef<Relation>> aliasedRelations = new
LinkedHashSet<>();
+ private Expression globalTableModelTimePredicate;
+
+ private DataPartition dataPartition;
+
+ public Expression getGlobalTableModelTimePredicate() {
+ return this.globalTableModelTimePredicate;
+ }
+
+ public void setGlobalTableModelTimePredicate(Expression
globalTableModelTimePredicate) {
+ this.globalTableModelTimePredicate = globalTableModelTimePredicate;
+ }
+
+ public DataPartition getDataPartition() {
+ return dataPartition;
+ }
+
+ public void setDataPartition(DataPartition dataPartition) {
+ this.dataPartition = dataPartition;
+ }
+
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>,
Expression> parameters) {
this.root = root;
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters,
"parameters is null"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index f301a3cebec..6ca42581058 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -243,7 +243,9 @@ public class QueryPlanner {
return subPlan;
}
- Pair<Expression, Boolean> ret = extractGlobalTimePredicate(predicate,
true, true);
+ Pair<Expression, Boolean> resultPair =
extractGlobalTimePredicate(predicate, true, true);
+ Expression globalTimePredicate = resultPair.left;
+ analysis.setGlobalTableModelTimePredicate(globalTimePredicate);
return subPlan.withNewRoot(
new FilterNode(idAllocator.genPlanNodeId(), subPlan.getRoot(),
predicate));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index d8526986a92..e49177387a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -21,6 +21,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.relational.sql.tree.AliasedRelation;
import org.apache.iotdb.db.relational.sql.tree.AstVisitor;
@@ -94,6 +95,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan,
Void> {
ImmutableList.Builder<Symbol> outputSymbolsBuilder =
ImmutableList.builder();
ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema =
ImmutableMap.builder();
Collection<Field> fields = scope.getRelationType().getAllFields();
+ TableSchema tableSchema = analysis.getTables().iterator().next();
for (Field field : fields) {
Symbol symbol = symbolAllocator.newSymbol(field);
outputSymbolsBuilder.add(symbol);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
index 5247d9ff06e..b0ee63c09db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
@@ -24,16 +24,16 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
-import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.TableModelTimePredicate;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.relational.sql.tree.Expression;
import org.apache.iotdb.db.relational.sql.tree.Query;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -95,14 +95,12 @@ public class FragmentInstanceGenerator {
}
private void produceFragmentInstance(PlanFragment fragment) {
- // TODO fix globalTimePredicate
- // Expression globalTimePredicate = analysis.getGlobalTimePredicate();
- Expression globalTimePredicate = null;
+ Expression globalTimePredicate =
analysis.getGlobalTableModelTimePredicate();
FragmentInstance fragmentInstance =
new FragmentInstance(
fragment,
fragment.getId().genFragmentInstanceId(),
- globalTimePredicate == null ? null : new
TreeModelTimePredicate(globalTimePredicate),
+ globalTimePredicate == null ? null : new
TableModelTimePredicate(globalTimePredicate),
QueryType.READ,
queryContext.getTimeOut(),
queryContext.getSession(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index ee599da87e3..6dbb8c5d70a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -160,4 +160,8 @@ public class TableScanNode extends PlanNode {
public TRegionReplicaSet getRegionReplicaSet() {
return this.regionReplicaSet;
}
+
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index c52131813c9..b1e4ca0fb2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -14,8 +14,16 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -25,16 +33,27 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.relational.sql.tree.Expression;
+import org.apache.iotdb.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
/** Extract IDeviceID and */
public class IndexScan implements RelationalPlanOptimizer {
+ static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
@Override
public PlanNode optimize(
PlanNode planNode,
@@ -42,7 +61,8 @@ public class IndexScan implements RelationalPlanOptimizer {
Metadata metadata,
SessionInfo sessionInfo,
MPPQueryContext context) {
- return planNode.accept(new Rewriter(), new RewriterContext(null, metadata,
sessionInfo));
+ return planNode.accept(
+ new Rewriter(), new RewriterContext(null, metadata, sessionInfo,
analysis));
}
private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext>
{
@@ -67,6 +87,7 @@ public class IndexScan implements RelationalPlanOptimizer {
.filter(e -> e.getValue().getColumnCategory().equals(ATTRIBUTE))
.map(e -> e.getKey().getName())
.collect(Collectors.toList());
+
// TODO extract predicate to expression list
List<DeviceEntry> deviceEntries =
context
@@ -78,19 +99,85 @@ public class IndexScan implements RelationalPlanOptimizer {
Collections.singletonList(context.getPredicate()),
attributeColumns);
node.setDeviceEntries(deviceEntries);
+
+ // TODO getDataPartition, Change globalTimeFilter to Filter
+ IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
+ Filter globalTimeFilter = null;
+ Set<String> deviceSet = new HashSet<>();
+ for (DeviceEntry deviceEntry : deviceEntries) {
+ StringArrayDeviceID arrayDeviceID = (StringArrayDeviceID)
deviceEntry.getDeviceID();
+ String device = arrayDeviceID.toString();
+ deviceSet.add(device);
+ }
+ String database = "root." +
context.getSessionInfo().getDatabaseName().get();
+ DataPartition dataPartition =
+ fetchDataPartitionByDevices(deviceSet, database, globalTimeFilter,
partitionFetcher);
+ context.getAnalysis().setDataPartition(dataPartition);
+ // List<TRegionReplicaSet> regionReplicaList =
+ // dataPartition.getDataRegionReplicaSetWithTimeFilter
+ //
(((StringArrayDeviceID)deviceEntries.get(0).getDeviceID().toString(),
+ // globalTimeFilter);
+
+ // TODO add the real impl
+ TRegionReplicaSet regionReplicaSet =
+ dataPartition
+ .getDataPartitionMap()
+ .values()
+ .iterator()
+ .next()
+ .values()
+ .iterator()
+ .next()
+ .values()
+ .iterator()
+ .next()
+ .get(0);
+ node.setRegionReplicaSet(regionReplicaSet);
+
return node;
}
}
+ private static DataPartition fetchDataPartitionByDevices(
+ Set<String> deviceSet,
+ String database,
+ Filter globalTimeFilter,
+ IPartitionFetcher partitionFetcher) {
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
+ getTimePartitionSlotList(globalTimeFilter);
+ // there is no satisfied time range
+ if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
+ return new DataPartition(
+ Collections.emptyMap(),
+ CONFIG.getSeriesPartitionExecutorClass(),
+ CONFIG.getSeriesPartitionSlotNum());
+ }
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(devicePath, res.left, res.right.left,
res.right.right);
+ sgNameToQueryParamsMap.computeIfAbsent(database, key -> new
ArrayList<>()).add(queryParam);
+ }
+
+ if (res.right.left || res.right.right) {
+ return
partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+ } else {
+ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
+ }
+
private static class RewriterContext {
private Expression predicate;
private Metadata metadata;
private final SessionInfo sessionInfo;
+ private Analysis analysis;
- RewriterContext(Expression predicate, Metadata metadata, SessionInfo
sessionInfo) {
+ RewriterContext(
+ Expression predicate, Metadata metadata, SessionInfo sessionInfo,
Analysis analysis) {
this.predicate = predicate;
this.metadata = metadata;
this.sessionInfo = sessionInfo;
+ this.analysis = analysis;
}
public Expression getPredicate() {
@@ -112,5 +199,9 @@ public class IndexScan implements RelationalPlanOptimizer {
public SessionInfo getSessionInfo() {
return this.sessionInfo;
}
+
+ public Analysis getAnalysis() {
+ return this.analysis;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 77e8b83117b..2e57ba71c19 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -134,7 +135,7 @@ public class AnalyzerTest {
@Test
public void testSingleTableQuery() throws IoTDBException {
// no sort
- String sql = "SELECT tag1 as tt, tag2, attr1, s1+1 FROM table1 where
time>1 and s1>1";
+ String sql = "SELECT tag1, s1 FROM table1";
// + "WHERE time>1 AND tag1='A' OR s2>3";
Metadata metadata = new TestMatadata();
@@ -144,7 +145,14 @@ public class AnalyzerTest {
QueryId queryId = new QueryId("tmp_query");
MPPQueryContext context = new MPPQueryContext(queryId);
- SessionInfo sessionInfo = new SessionInfo(1L, "iotdb",
ZoneId.systemDefault());
+ SessionInfo sessionInfo =
+ new SessionInfo(
+ 1L,
+ "iotdb-user",
+ ZoneId.systemDefault(),
+ IoTDBConstant.ClientVersion.V_1_0,
+ "db",
+ IClientSession.SqlDialect.TABLE);
WarningCollector warningCollector = WarningCollector.NOOP;
LogicalPlanner logicalPlanner =
new LogicalPlanner(context, metadata, sessionInfo, warningCollector);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index b40538f7869..3ef17b1a2b0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -1,5 +1,6 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -70,14 +71,22 @@ public class TestMatadata implements Metadata {
public Optional<TableSchema> getTableSchema(SessionInfo session,
QualifiedObjectName name) {
List<ColumnSchema> columnSchemas =
Arrays.asList(
- ColumnSchema.builder(TIME_CM).build(),
- ColumnSchema.builder(TAG1_CM).build(),
- ColumnSchema.builder(TAG2_CM).build(),
- ColumnSchema.builder(TAG3_CM).build(),
- ColumnSchema.builder(ATTR1_CM).build(),
- ColumnSchema.builder(ATTR2_CM).build(),
- ColumnSchema.builder(S1_CM).build(),
- ColumnSchema.builder(S2_CM).build());
+
ColumnSchema.builder(TIME_CM).setColumnCategory(TsTableColumnCategory.TIME).build(),
+
ColumnSchema.builder(TAG1_CM).setColumnCategory(TsTableColumnCategory.ID).build(),
+
ColumnSchema.builder(TAG2_CM).setColumnCategory(TsTableColumnCategory.ID).build(),
+
ColumnSchema.builder(TAG3_CM).setColumnCategory(TsTableColumnCategory.ID).build(),
+ ColumnSchema.builder(ATTR1_CM)
+ .setColumnCategory(TsTableColumnCategory.ATTRIBUTE)
+ .build(),
+ ColumnSchema.builder(ATTR2_CM)
+ .setColumnCategory(TsTableColumnCategory.ATTRIBUTE)
+ .build(),
+ ColumnSchema.builder(S1_CM)
+ .setColumnCategory(TsTableColumnCategory.MEASUREMENT)
+ .build(),
+ ColumnSchema.builder(S2_CM)
+ .setColumnCategory(TsTableColumnCategory.MEASUREMENT)
+ .build());
return Optional.of(new TableSchema(TABLE1, columnSchemas));
}