This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/table-model-debug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56be673ec65148e0939003b4cf68ecbe78dddff5 Author: Beyyes <[email protected]> AuthorDate: Fri Apr 19 11:58:42 2024 +0800 beyyes table model debug tmp --- .../fragment/FragmentInstanceContext.java | 1 + .../source/relational/TableScanOperator.java | 7 +++- .../queryengine/plan/execution/QueryExecution.java | 1 + .../plan/planner/TableOperatorGenerator.java | 46 ++++++++++++++++++++++ .../plan/relational/analyzer/Analysis.java | 8 +++- .../relational/metadata/TableMetadataImpl.java | 3 +- .../plan/relational/planner/LogicalPlanner.java | 18 ++++++++- .../plan/relational/planner/RelationPlan.java | 10 ++--- .../plan/relational/planner/RelationPlanner.java | 22 +++++++++-- .../relational/planner/RelationalModelPlanner.java | 2 +- .../distribute/RelationalDistributionPlanner.java | 40 +++++++++++++++++++ .../plan/relational/planner/node/OutputNode.java | 4 ++ .../relational/planner/node/TableScanNode.java | 18 ++++++++- .../planner/optimizations/IndexScan.java | 5 ++- 14 files changed, 168 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index e3025bd1497..1b6861d2f59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -113,6 +113,7 @@ public class FragmentInstanceContext extends QueryContext { return instanceContext; } + // This method is only used in groupby public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java index a8f3a6da567..05ef2ef2e3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -121,7 +121,7 @@ public class TableScanOperator extends AbstractDataSourceOperator { this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); this.measurementDataBuilder = new TsBlockBuilder(this.measurementColumnTSDataTypes); - this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } @Override @@ -170,7 +170,10 @@ public class TableScanOperator extends AbstractDataSourceOperator { // append id column and attribute column if (!isEmpty(measurementDataBlock)) { constructResultTsBlock(); + } else { + return null; } + measurementDataBlock = null; return checkTsBlockSizeAndGetResult(); } @@ -329,6 +332,8 @@ public class TableScanOperator extends AbstractDataSourceOperator { this.seriesScanUtil.initQueryDataSource(dataSource); this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + //this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + //this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } private void prepareForNextDevice() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 8446c0bd4fe..93a3c1bfab8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -552,6 +552,7 @@ public class QueryExecution implements IQueryExecution { private void initResultHandle() { TEndPoint upstreamEndPoint = context.getResultNodeContext().getUpStreamEndpoint(); + // TEndPoint upstreamEndPoint = LOCAL_HOST_DATA_BLOCK_ENDPOINT; this.resultHandle = isSameNode(upstreamEndPoint) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index c8825447e88..b705727799e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -20,12 +20,19 @@ package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; +import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager; +import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator; +import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder; @@ -34,6 +41,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; @@ -60,6 +68,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -69,6 +78,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; @@ -83,6 +93,42 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution throw new UnsupportedOperationException("should call the concrete visitXX() method"); } + private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER = + MPPDataExchangeService.getInstance().getMPPDataExchangeManager(); + + @Override + public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanContext context) { + context.addExchangeSumNum(1); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + IdentitySinkOperator.class.getSimpleName()); + + checkArgument( + MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); + FragmentInstanceId localInstanceId = context.getInstanceContext().getId(); + DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0); + ISinkHandle sinkHandle = + MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle( + node.getDownStreamChannelLocationList(), + downStreamChannelIndex, + ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN, + localInstanceId.toThrift(), + node.getPlanNodeId().getId(), + context.getInstanceContext()); + sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve()); + context.getDriverContext().setSink(sinkHandle); + + // List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context); + Operator child = node.getChildren().get(0).accept(this, context); + List<Operator> children = new ArrayList<>(1); + children.add(child); + return new IdentitySinkOperator(operatorContext, children, downStreamChannelIndex, sinkHandle); + } + @Override public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index d2a69511628..7633413aab9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -152,6 +152,8 @@ public class Analysis implements IAnalysis { private DataPartition dataPartition; + private DatasetHeader respDatasetHeader; + public Expression getGlobalTableModelTimePredicate() { return this.globalTableModelTimePredicate; } @@ -593,7 +595,11 @@ public class Analysis implements IAnalysis { @Override public DatasetHeader getRespDatasetHeader() { - return null; + return respDatasetHeader; + } + + public void setRespDatasetHeader(DatasetHeader respDatasetHeader) { + this.respDatasetHeader = respDatasetHeader; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index fa7302837a7..a0c9e0083d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.db.exception.metadata.table.TableNotExistsException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.SessionInfo; -import org.apache.iotdb.db.queryengine.plan.relational.analyzer.schema.TableModelSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; @@ -261,7 +260,7 @@ public class TableMetadataImpl implements Metadata { List<Expression> expressionList, List<String> attributeColumns) { List<DeviceEntry> result = new ArrayList<>(); - IDeviceID deviceID = new StringArrayDeviceID("beijing", "a_1"); + IDeviceID deviceID = new StringArrayDeviceID("db", "table1", "beijing", "a_1"); result.add(new DeviceEntry(deviceID, Arrays.asList("new", "low"))); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index 843517be036..bd6b1ddf3dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@ -16,6 +16,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -36,9 +38,12 @@ import org.apache.iotdb.db.relational.sql.tree.Table; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -109,8 +114,19 @@ public class LogicalPlanner { columnNumber++; } - return new OutputNode( + OutputNode outputNode = new OutputNode( context.getQueryId().genPlanNodeId(), plan.getRoot(), names.build(), outputs.build()); + + +// List<ColumnHeader> columnHeaders = +// outputNode.getOutputColumnNames().stream().map(column -> new ColumnHeader(column, TSDataType.DOUBLE)).collect(Collectors.toList()); + List<ColumnHeader> columnHeaders = new ArrayList<>(); + for (String columnName : outputNode.getColumnNames()) { + columnHeaders.add(new ColumnHeader(columnName, TSDataType.DOUBLE)); + } + DatasetHeader respDatasetHeader = new DatasetHeader(columnHeaders, false); + analysis.setRespDatasetHeader(respDatasetHeader); + return outputNode; } private RelationPlan createRelationPlan(Analysis analysis, Query query) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java index cdd1bab217f..e8a77e31e73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java @@ -44,11 +44,11 @@ public class RelationPlan { requireNonNull(scope, "scope is null"); int allFieldCount = scope.getLocalScopeFieldCount(); - checkArgument( - allFieldCount == fieldMappings.size(), - "Number of outputs (%s) doesn't match number of fields in local scope (%s)", - fieldMappings.size(), - allFieldCount); +// checkArgument( +// allFieldCount == fieldMappings.size(), +// "Number of outputs (%s) doesn't match number of fields in local scope (%s)", +// fieldMappings.size(), +// allFieldCount); this.root = root; this.scope = scope; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index e49177387a1..37b0d3aeeb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -41,10 +41,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ID; public class RelationPlanner extends AstVisitor<RelationPlan, Void> { private final Analysis analysis; @@ -91,29 +94,42 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { expansion.getRoot(), expansion.getScope(), expansion.getFieldMappings()); } + Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>(); + Scope scope = analysis.getScope(table); ImmutableList.Builder<Symbol> outputSymbolsBuilder = ImmutableList.builder(); ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema = ImmutableMap.builder(); Collection<Field> fields = scope.getRelationType().getAllFields(); - TableSchema tableSchema = analysis.getTables().iterator().next(); + int IDIdx = 0, attributeIdx = 0; for (Field field : fields) { + if ("time".equalsIgnoreCase(field.getName().get())) { + // TODO consider time ColumnCategory + continue; + } Symbol symbol = symbolAllocator.newSymbol(field); outputSymbolsBuilder.add(symbol); symbolToColumnSchema.put( symbol, new ColumnSchema( field.getName().get(), field.getType(), field.isHidden(), field.getColumnCategory())); + + if (ID.equals(field.getColumnCategory())) { + idAndAttributeIndexMap.put(symbol, IDIdx++); + } else if (ATTRIBUTE.equals(field.getColumnCategory())) { + idAndAttributeIndexMap.put(symbol, attributeIdx++); + } } List<Symbol> outputSymbols = outputSymbolsBuilder.build(); - PlanNode root = + TableScanNode tableScanNode = new TableScanNode( idAllocator.genPlanNodeId(), table.getName().toString(), outputSymbols, symbolToColumnSchema.build()); + tableScanNode.setIdAndAttributeIndexMap(idAndAttributeIndexMap); - return new RelationPlan(root, scope, outputSymbols); + return new RelationPlan(tableScanNode, scope, outputSymbols); // Collection<Field> fields = analysis.getMaterializedViewStorageTableFields(node); // Query namedQuery = analysis.getNamedQuery(node); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java index b9a6604e708..f807fd19dfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java @@ -108,7 +108,7 @@ public class RelationalModelPlanner implements IPlanner { @Override public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context) { try { - return new LogicalPlanner(context, metadata, null, warningCollector) + return new LogicalPlanner(context, metadata, context.getSession(), warningCollector) .plan((Analysis) analysis); } catch (IoTDBException e) { throw new RuntimeException(e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java index 64d2aa70599..45ece8cae71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java @@ -14,13 +14,17 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import java.util.Collections; import java.util.List; public class RelationalDistributionPlanner { @@ -52,7 +56,43 @@ public class RelationalDistributionPlanner { List<FragmentInstance> fragmentInstances = new FragmentInstanceGenerator(subPlan, analysis, context).plan(); + // Only execute this step for READ operation + if (context.getQueryType() == QueryType.READ) { + setSinkForRootInstance(subPlan, fragmentInstances); + } + return new DistributedQueryPlan( logicalQueryPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances); } + + public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) { + FragmentInstance rootInstance = null; + for (FragmentInstance instance : instances) { + if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) { + rootInstance = instance; + break; + } + } + // root should not be null during normal process + if (rootInstance == null) { + return; + } + + IdentitySinkNode sinkNode = + new IdentitySinkNode( + context.getQueryId().genPlanNodeId(), + Collections.singletonList(rootInstance.getFragment().getPlanNodeTree()), + Collections.singletonList( + new DownStreamChannelLocation( + context.getLocalDataBlockEndpoint(), + context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(), + context.getResultNodeContext().getVirtualResultNodeId().getId()))); + context + .getResultNodeContext() + .setUpStream( + rootInstance.getHostDataNode().mPPDataExchangeEndPoint, + rootInstance.getId(), + sinkNode.getPlanNodeId()); + rootInstance.getFragment().setPlanNodeTree(sinkNode); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java index 9b06d0b8d40..e8e0a08af6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java @@ -49,6 +49,10 @@ public class OutputNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public List<String> getColumnNames() { + return this.columnNames; + } + public List<Symbol> getOutputSymbols() { return outputs; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java index 6dbb8c5d70a..7520a235ac8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java @@ -4,6 +4,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -22,7 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -public class TableScanNode extends PlanNode { +public class TableScanNode extends SourceNode { // db.tablename private final String qualifiedTableName; @@ -133,6 +134,10 @@ public class TableScanNode extends PlanNode { return this.idAndAttributeIndexMap; } + public void setIdAndAttributeIndexMap(Map<Symbol, Integer> idAndAttributeIndexMap) { + this.idAndAttributeIndexMap = idAndAttributeIndexMap; + } + public Map<Symbol, ColumnSchema> getAssignments() { return this.assignments; } @@ -161,7 +166,18 @@ public class TableScanNode extends PlanNode { return this.regionReplicaSet; } + @Override + public void open() throws Exception { + + } + + @Override public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { this.regionReplicaSet = regionReplicaSet; } + + @Override + public void close() throws Exception { + + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java index 55c9218d943..aa738291ced 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java @@ -106,15 +106,16 @@ public class IndexScan implements RelationalPlanOptimizer { node.setDeviceEntries(deviceEntries); // TODO getDataPartition, Change globalTimeFilter to Filter + String database = "root." + context.getSessionInfo().getDatabaseName().get(); IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance(); Filter globalTimeFilter = null; Set<String> deviceSet = new HashSet<>(); for (DeviceEntry deviceEntry : deviceEntries) { StringArrayDeviceID arrayDeviceID = (StringArrayDeviceID) deviceEntry.getDeviceID(); String device = arrayDeviceID.toString(); - deviceSet.add(device); + deviceSet.add("root."+device); } - String database = "root." + context.getSessionInfo().getDatabaseName().get(); + DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, database, globalTimeFilter, partitionFetcher); context.getAnalysis().setDataPartition(dataPartition);
