This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 47909acb6a3 fix distributed plan, tabelscanoperator, fragmentinstance
47909acb6a3 is described below
commit 47909acb6a3e489b8ecf3de6091db40e58f0f2d4
Author: Beyyes <[email protected]>
AuthorDate: Fri Apr 19 11:57:41 2024 +0800
fix distributed plan, tabelscanoperator, fragmentinstance
---
.../fragment/FragmentInstanceContext.java | 1 +
.../source/relational/TableScanOperator.java | 2 +
.../plan/planner/TableOperatorGenerator.java | 45 ++++++++++++++++++++++
.../plan/relational/analyzer/Analysis.java | 8 +++-
.../relational/metadata/TableMetadataImpl.java | 10 ++++-
.../plan/relational/planner/LogicalPlanner.java | 20 +++++++++-
.../plan/relational/planner/RelationPlanner.java | 19 ++++++---
.../relational/planner/RelationalModelPlanner.java | 2 +-
.../distribute/RelationalDistributionPlanner.java | 40 +++++++++++++++++++
.../plan/relational/planner/node/OutputNode.java | 6 ++-
.../relational/planner/node/TableScanNode.java | 13 ++++++-
11 files changed, 154 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index e3025bd1497..1b6861d2f59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -113,6 +113,7 @@ public class FragmentInstanceContext extends QueryContext {
return instanceContext;
}
+ // This method is only used in groupby
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index 22cefbeb82e..80eb310db1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -168,6 +168,8 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
// append id column and attribute column
if (!isEmpty(measurementDataBlock)) {
constructResultTsBlock();
+ } else {
+ return null;
}
measurementDataBlock = null;
return checkTsBlockSizeAndGetResult();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 94ab2158821..2c1c431f096 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -20,12 +20,19 @@
package org.apache.iotdb.db.queryengine.plan.planner;
import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
+import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
+import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
+import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
+import
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
@@ -34,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -69,6 +77,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
import static
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
@@ -83,6 +92,42 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
throw new UnsupportedOperationException("should call the concrete
visitXX() method");
}
+ private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
+ MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
+
+ @Override
+ public Operator visitIdentitySink(IdentitySinkNode node,
LocalExecutionPlanContext context) {
+ context.addExchangeSumNum(1);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ IdentitySinkOperator.class.getSimpleName());
+
+ checkArgument(
+ MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should
not be null");
+ FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+ DownStreamChannelIndex downStreamChannelIndex = new
DownStreamChannelIndex(0);
+ ISinkHandle sinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+ node.getDownStreamChannelLocationList(),
+ downStreamChannelIndex,
+ ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN,
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ context.getInstanceContext());
+ sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+ context.getDriverContext().setSink(sinkHandle);
+
+ // List<Operator> children = dealWithConsumeChildrenOneByOneNode(node,
context);
+ Operator child = node.getChildren().get(0).accept(this, context);
+ List<Operator> children = new ArrayList<>(1);
+ children.add(child);
+ return new IdentitySinkOperator(operatorContext, children,
downStreamChannelIndex, sinkHandle);
+ }
+
@Override
public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index d2a69511628..7633413aab9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -152,6 +152,8 @@ public class Analysis implements IAnalysis {
private DataPartition dataPartition;
+ private DatasetHeader respDatasetHeader;
+
public Expression getGlobalTableModelTimePredicate() {
return this.globalTableModelTimePredicate;
}
@@ -593,7 +595,11 @@ public class Analysis implements IAnalysis {
@Override
public DatasetHeader getRespDatasetHeader() {
- return null;
+ return respDatasetHeader;
+ }
+
+ public void setRespDatasetHeader(DatasetHeader respDatasetHeader) {
+ this.respDatasetHeader = respDatasetHeader;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index e5cfa4befb2..52ae6220dba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -31,8 +31,12 @@ import
org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundExceptio
import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature;
import org.apache.iotdb.db.relational.sql.tree.Expression;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.iotdb.tsfile.read.common.type.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
@@ -233,7 +237,11 @@ public class TableMetadataImpl implements Metadata {
QualifiedObjectName tableName,
List<Expression> expressionList,
List<String> attributeColumns) {
- return null;
+ // fixme, perfect the real metadata impl
+ List<DeviceEntry> result = new ArrayList<>();
+ IDeviceID deviceID = new StringArrayDeviceID("db", "table1", "beijing",
"a_1");
+ result.add(new DeviceEntry(deviceID, Arrays.asList("new", "low")));
+ return result;
}
public static boolean isTwoNumericType(List<? extends Type> argumentTypes) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 843517be036..fe97bb4b992 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -16,6 +16,8 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -32,11 +34,13 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Sim
import org.apache.iotdb.db.relational.sql.tree.Query;
import org.apache.iotdb.db.relational.sql.tree.Statement;
import org.apache.iotdb.db.relational.sql.tree.Table;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -109,8 +113,20 @@ public class LogicalPlanner {
columnNumber++;
}
- return new OutputNode(
- context.getQueryId().genPlanNodeId(), plan.getRoot(), names.build(),
outputs.build());
+ OutputNode outputNode =
+ new OutputNode(
+ context.getQueryId().genPlanNodeId(), plan.getRoot(),
names.build(), outputs.build());
+
+ // List<ColumnHeader> columnHeaders =
+ // outputNode.getOutputColumnNames().stream().map(column -> new
ColumnHeader(column,
+ // TSDataType.DOUBLE)).collect(Collectors.toList());
+ List<ColumnHeader> columnHeaders = new ArrayList<>();
+ for (String columnName : outputNode.getColumnNames()) {
+ columnHeaders.add(new ColumnHeader(columnName, TSDataType.DOUBLE));
+ }
+ DatasetHeader respDatasetHeader = new DatasetHeader(columnHeaders, false);
+ analysis.setRespDatasetHeader(respDatasetHeader);
+ return outputNode;
}
private RelationPlan createRelationPlan(Analysis analysis, Query query) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index e49177387a1..b33236b1292 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -15,13 +15,11 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.relational.sql.tree.AliasedRelation;
import org.apache.iotdb.db.relational.sql.tree.AstVisitor;
@@ -41,10 +39,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.relational.sql.tree.ColumnDefinition.ColumnCategory.ATTRIBUTE;
+import static
org.apache.iotdb.db.relational.sql.tree.ColumnDefinition.ColumnCategory.ID;
public class RelationPlanner extends AstVisitor<RelationPlan, Void> {
private final Analysis analysis;
@@ -91,11 +92,12 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
expansion.getRoot(), expansion.getScope(),
expansion.getFieldMappings());
}
+ Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>();
Scope scope = analysis.getScope(table);
ImmutableList.Builder<Symbol> outputSymbolsBuilder =
ImmutableList.builder();
ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema =
ImmutableMap.builder();
Collection<Field> fields = scope.getRelationType().getAllFields();
- TableSchema tableSchema = analysis.getTables().iterator().next();
+ int IDIdx = 0, attributeIdx = 0;
for (Field field : fields) {
Symbol symbol = symbolAllocator.newSymbol(field);
outputSymbolsBuilder.add(symbol);
@@ -103,17 +105,24 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
symbol,
new ColumnSchema(
field.getName().get(), field.getType(), field.isHidden(),
field.getColumnCategory()));
+
+ if (ID.equals(field.getColumnCategory())) {
+ idAndAttributeIndexMap.put(symbol, IDIdx++);
+ } else if (ATTRIBUTE.equals(field.getColumnCategory())) {
+ idAndAttributeIndexMap.put(symbol, attributeIdx++);
+ }
}
List<Symbol> outputSymbols = outputSymbolsBuilder.build();
- PlanNode root =
+ TableScanNode tableScanNode =
new TableScanNode(
idAllocator.genPlanNodeId(),
table.getName().toString(),
outputSymbols,
symbolToColumnSchema.build());
- return new RelationPlan(root, scope, outputSymbols);
+ tableScanNode.setIdAndAttributeIndexMap(idAndAttributeIndexMap);
+ return new RelationPlan(tableScanNode, scope, outputSymbols);
// Collection<Field> fields =
analysis.getMaterializedViewStorageTableFields(node);
// Query namedQuery = analysis.getNamedQuery(node);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
index b9a6604e708..f807fd19dfe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
@@ -108,7 +108,7 @@ public class RelationalModelPlanner implements IPlanner {
@Override
public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext
context) {
try {
- return new LogicalPlanner(context, metadata, null, warningCollector)
+ return new LogicalPlanner(context, metadata, context.getSession(),
warningCollector)
.plan((Analysis) analysis);
} catch (IoTDBException e) {
throw new RuntimeException(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
index 64d2aa70599..45ece8cae71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
@@ -14,13 +14,17 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import java.util.Collections;
import java.util.List;
public class RelationalDistributionPlanner {
@@ -52,7 +56,43 @@ public class RelationalDistributionPlanner {
List<FragmentInstance> fragmentInstances =
new FragmentInstanceGenerator(subPlan, analysis, context).plan();
+ // Only execute this step for READ operation
+ if (context.getQueryType() == QueryType.READ) {
+ setSinkForRootInstance(subPlan, fragmentInstances);
+ }
+
return new DistributedQueryPlan(
logicalQueryPlan.getContext(), subPlan, subPlan.getPlanFragmentList(),
fragmentInstances);
}
+
+ public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance>
instances) {
+ FragmentInstance rootInstance = null;
+ for (FragmentInstance instance : instances) {
+ if
(instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
+ rootInstance = instance;
+ break;
+ }
+ }
+ // root should not be null during normal process
+ if (rootInstance == null) {
+ return;
+ }
+
+ IdentitySinkNode sinkNode =
+ new IdentitySinkNode(
+ context.getQueryId().genPlanNodeId(),
+
Collections.singletonList(rootInstance.getFragment().getPlanNodeTree()),
+ Collections.singletonList(
+ new DownStreamChannelLocation(
+ context.getLocalDataBlockEndpoint(),
+
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+
context.getResultNodeContext().getVirtualResultNodeId().getId())));
+ context
+ .getResultNodeContext()
+ .setUpStream(
+ rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
+ rootInstance.getId(),
+ sinkNode.getPlanNodeId());
+ rootInstance.getFragment().setPlanNodeTree(sinkNode);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index 9b06d0b8d40..34c7b6aadec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -40,7 +40,7 @@ public class OutputNode extends SingleChildProcessNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return this.columnNames;
}
@Override
@@ -49,6 +49,10 @@ public class OutputNode extends SingleChildProcessNode {
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
+ public List<String> getColumnNames() {
+ return this.columnNames;
+ }
+
public List<Symbol> getOutputSymbols() {
return outputs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 6dbb8c5d70a..0c5902896b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -4,6 +4,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -22,7 +23,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
-public class TableScanNode extends PlanNode {
+public class TableScanNode extends SourceNode {
// db.tablename
private final String qualifiedTableName;
@@ -99,6 +100,12 @@ public class TableScanNode extends PlanNode {
return outputSymbols;
}
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -133,6 +140,10 @@ public class TableScanNode extends PlanNode {
return this.idAndAttributeIndexMap;
}
+ public void setIdAndAttributeIndexMap(Map<Symbol, Integer>
idAndAttributeIndexMap) {
+ this.idAndAttributeIndexMap = idAndAttributeIndexMap;
+ }
+
public Map<Symbol, ColumnSchema> getAssignments() {
return this.assignments;
}