This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/table-model-debug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56be673ec65148e0939003b4cf68ecbe78dddff5
Author: Beyyes <[email protected]>
AuthorDate: Fri Apr 19 11:58:42 2024 +0800

    beyyes table model debug tmp
---
 .../fragment/FragmentInstanceContext.java          |  1 +
 .../source/relational/TableScanOperator.java       |  7 +++-
 .../queryengine/plan/execution/QueryExecution.java |  1 +
 .../plan/planner/TableOperatorGenerator.java       | 46 ++++++++++++++++++++++
 .../plan/relational/analyzer/Analysis.java         |  8 +++-
 .../relational/metadata/TableMetadataImpl.java     |  3 +-
 .../plan/relational/planner/LogicalPlanner.java    | 18 ++++++++-
 .../plan/relational/planner/RelationPlan.java      | 10 ++---
 .../plan/relational/planner/RelationPlanner.java   | 22 +++++++++--
 .../relational/planner/RelationalModelPlanner.java |  2 +-
 .../distribute/RelationalDistributionPlanner.java  | 40 +++++++++++++++++++
 .../plan/relational/planner/node/OutputNode.java   |  4 ++
 .../relational/planner/node/TableScanNode.java     | 18 ++++++++-
 .../planner/optimizations/IndexScan.java           |  5 ++-
 14 files changed, 168 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index e3025bd1497..1b6861d2f59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -113,6 +113,7 @@ public class FragmentInstanceContext extends QueryContext {
     return instanceContext;
   }
 
+  // This method is only used in groupby
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index a8f3a6da567..05ef2ef2e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -121,7 +121,7 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
     this.seriesScanUtil = 
constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex));
 
     this.measurementDataBuilder = new 
TsBlockBuilder(this.measurementColumnTSDataTypes);
-    this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
+
   }
 
   @Override
@@ -170,7 +170,10 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
     // append id column and attribute column
     if (!isEmpty(measurementDataBlock)) {
       constructResultTsBlock();
+    } else {
+      return null;
     }
+    measurementDataBlock = null;
     return checkTsBlockSizeAndGetResult();
   }
 
@@ -329,6 +332,8 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
     this.seriesScanUtil.initQueryDataSource(dataSource);
     this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
     this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
+    //this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
+    
//this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
   }
 
   private void prepareForNextDevice() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 8446c0bd4fe..93a3c1bfab8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -552,6 +552,7 @@ public class QueryExecution implements IQueryExecution {
 
   private void initResultHandle() {
     TEndPoint upstreamEndPoint = 
context.getResultNodeContext().getUpStreamEndpoint();
+    // TEndPoint upstreamEndPoint = LOCAL_HOST_DATA_BLOCK_ENDPOINT;
 
     this.resultHandle =
         isSameNode(upstreamEndPoint)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index c8825447e88..b705727799e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -20,12 +20,19 @@
 package org.apache.iotdb.db.queryengine.plan.planner;
 
 import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
@@ -34,6 +41,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -60,6 +68,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -69,6 +78,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
@@ -83,6 +93,42 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     throw new UnsupportedOperationException("should call the concrete 
visitXX() method");
   }
 
+  private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
+          MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
+
+  @Override
+  public Operator visitIdentitySink(IdentitySinkNode node, 
LocalExecutionPlanContext context) {
+    context.addExchangeSumNum(1);
+    OperatorContext operatorContext =
+            context
+                    .getDriverContext()
+                    .addOperatorContext(
+                            context.getNextOperatorId(),
+                            node.getPlanNodeId(),
+                            IdentitySinkOperator.class.getSimpleName());
+
+    checkArgument(
+            MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER 
should not be null");
+    FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+    DownStreamChannelIndex downStreamChannelIndex = new 
DownStreamChannelIndex(0);
+    ISinkHandle sinkHandle =
+            MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+                    node.getDownStreamChannelLocationList(),
+                    downStreamChannelIndex,
+                    ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN,
+                    localInstanceId.toThrift(),
+                    node.getPlanNodeId().getId(),
+                    context.getInstanceContext());
+    sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+    context.getDriverContext().setSink(sinkHandle);
+
+    // List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, 
context);
+    Operator child = node.getChildren().get(0).accept(this, context);
+    List<Operator> children = new ArrayList<>(1);
+    children.add(child);
+    return new IdentitySinkOperator(operatorContext, children, 
downStreamChannelIndex, sinkHandle);
+  }
+
   @Override
   public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext 
context) {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index d2a69511628..7633413aab9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -152,6 +152,8 @@ public class Analysis implements IAnalysis {
 
   private DataPartition dataPartition;
 
+  private DatasetHeader respDatasetHeader;
+
   public Expression getGlobalTableModelTimePredicate() {
     return this.globalTableModelTimePredicate;
   }
@@ -593,7 +595,11 @@ public class Analysis implements IAnalysis {
 
   @Override
   public DatasetHeader getRespDatasetHeader() {
-    return null;
+    return respDatasetHeader;
+  }
+
+  public void setRespDatasetHeader(DatasetHeader respDatasetHeader) {
+    this.respDatasetHeader = respDatasetHeader;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index fa7302837a7..a0c9e0083d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
 import org.apache.iotdb.db.exception.metadata.table.TableNotExistsException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
-import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.schema.TableModelSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
@@ -261,7 +260,7 @@ public class TableMetadataImpl implements Metadata {
       List<Expression> expressionList,
       List<String> attributeColumns) {
     List<DeviceEntry> result = new ArrayList<>();
-    IDeviceID deviceID = new StringArrayDeviceID("beijing", "a_1");
+    IDeviceID deviceID = new StringArrayDeviceID("db", "table1", "beijing", 
"a_1");
     result.add(new DeviceEntry(deviceID, Arrays.asList("new", "low")));
     return result;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 843517be036..bd6b1ddf3dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -16,6 +16,8 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -36,9 +38,12 @@ import org.apache.iotdb.db.relational.sql.tree.Table;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.log.Logger;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
@@ -109,8 +114,19 @@ public class LogicalPlanner {
       columnNumber++;
     }
 
-    return new OutputNode(
+    OutputNode outputNode = new OutputNode(
         context.getQueryId().genPlanNodeId(), plan.getRoot(), names.build(), 
outputs.build());
+
+
+//    List<ColumnHeader> columnHeaders =
+//            outputNode.getOutputColumnNames().stream().map(column -> new 
ColumnHeader(column, TSDataType.DOUBLE)).collect(Collectors.toList());
+    List<ColumnHeader> columnHeaders = new ArrayList<>();
+    for (String columnName : outputNode.getColumnNames()) {
+      columnHeaders.add(new ColumnHeader(columnName, TSDataType.DOUBLE));
+    }
+    DatasetHeader respDatasetHeader = new DatasetHeader(columnHeaders, false);
+    analysis.setRespDatasetHeader(respDatasetHeader);
+    return outputNode;
   }
 
   private RelationPlan createRelationPlan(Analysis analysis, Query query) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java
index cdd1bab217f..e8a77e31e73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java
@@ -44,11 +44,11 @@ public class RelationPlan {
     requireNonNull(scope, "scope is null");
 
     int allFieldCount = scope.getLocalScopeFieldCount();
-    checkArgument(
-        allFieldCount == fieldMappings.size(),
-        "Number of outputs (%s) doesn't match number of fields in local scope 
(%s)",
-        fieldMappings.size(),
-        allFieldCount);
+//    checkArgument(
+//        allFieldCount == fieldMappings.size(),
+//        "Number of outputs (%s) doesn't match number of fields in local 
scope (%s)",
+//        fieldMappings.size(),
+//        allFieldCount);
 
     this.root = root;
     this.scope = scope;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index e49177387a1..37b0d3aeeb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -41,10 +41,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ID;
 
 public class RelationPlanner extends AstVisitor<RelationPlan, Void> {
   private final Analysis analysis;
@@ -91,29 +94,42 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
           expansion.getRoot(), expansion.getScope(), 
expansion.getFieldMappings());
     }
 
+    Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>();
+
     Scope scope = analysis.getScope(table);
     ImmutableList.Builder<Symbol> outputSymbolsBuilder = 
ImmutableList.builder();
     ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema = 
ImmutableMap.builder();
     Collection<Field> fields = scope.getRelationType().getAllFields();
-    TableSchema tableSchema = analysis.getTables().iterator().next();
+    int IDIdx = 0, attributeIdx = 0;
     for (Field field : fields) {
+      if ("time".equalsIgnoreCase(field.getName().get())) {
+        // TODO consider time ColumnCategory
+        continue;
+      }
       Symbol symbol = symbolAllocator.newSymbol(field);
       outputSymbolsBuilder.add(symbol);
       symbolToColumnSchema.put(
           symbol,
           new ColumnSchema(
               field.getName().get(), field.getType(), field.isHidden(), 
field.getColumnCategory()));
+
+      if (ID.equals(field.getColumnCategory())) {
+        idAndAttributeIndexMap.put(symbol, IDIdx++);
+      } else if (ATTRIBUTE.equals(field.getColumnCategory())) {
+        idAndAttributeIndexMap.put(symbol, attributeIdx++);
+      }
     }
 
     List<Symbol> outputSymbols = outputSymbolsBuilder.build();
-    PlanNode root =
+    TableScanNode tableScanNode =
         new TableScanNode(
             idAllocator.genPlanNodeId(),
             table.getName().toString(),
             outputSymbols,
             symbolToColumnSchema.build());
+    tableScanNode.setIdAndAttributeIndexMap(idAndAttributeIndexMap);
 
-    return new RelationPlan(root, scope, outputSymbols);
+    return new RelationPlan(tableScanNode, scope, outputSymbols);
 
     // Collection<Field> fields = 
analysis.getMaterializedViewStorageTableFields(node);
     // Query namedQuery = analysis.getNamedQuery(node);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
index b9a6604e708..f807fd19dfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
@@ -108,7 +108,7 @@ public class RelationalModelPlanner implements IPlanner {
   @Override
   public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext 
context) {
     try {
-      return new LogicalPlanner(context, metadata, null, warningCollector)
+      return new LogicalPlanner(context, metadata, context.getSession(), 
warningCollector)
           .plan((Analysis) analysis);
     } catch (IoTDBException e) {
       throw new RuntimeException(e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
index 64d2aa70599..45ece8cae71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
@@ -14,13 +14,17 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 
+import java.util.Collections;
 import java.util.List;
 
 public class RelationalDistributionPlanner {
@@ -52,7 +56,43 @@ public class RelationalDistributionPlanner {
     List<FragmentInstance> fragmentInstances =
         new FragmentInstanceGenerator(subPlan, analysis, context).plan();
 
+    // Only execute this step for READ operation
+    if (context.getQueryType() == QueryType.READ) {
+      setSinkForRootInstance(subPlan, fragmentInstances);
+    }
+
     return new DistributedQueryPlan(
         logicalQueryPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), 
fragmentInstances);
   }
+
+  public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> 
instances) {
+    FragmentInstance rootInstance = null;
+    for (FragmentInstance instance : instances) {
+      if 
(instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
+        rootInstance = instance;
+        break;
+      }
+    }
+    // root should not be null during normal process
+    if (rootInstance == null) {
+      return;
+    }
+
+    IdentitySinkNode sinkNode =
+        new IdentitySinkNode(
+            context.getQueryId().genPlanNodeId(),
+            
Collections.singletonList(rootInstance.getFragment().getPlanNodeTree()),
+            Collections.singletonList(
+                new DownStreamChannelLocation(
+                    context.getLocalDataBlockEndpoint(),
+                    
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                    
context.getResultNodeContext().getVirtualResultNodeId().getId())));
+    context
+        .getResultNodeContext()
+        .setUpStream(
+            rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
+            rootInstance.getId(),
+            sinkNode.getPlanNodeId());
+    rootInstance.getFragment().setPlanNodeTree(sinkNode);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index 9b06d0b8d40..e8e0a08af6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -49,6 +49,10 @@ public class OutputNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public List<String> getColumnNames() {
+    return this.columnNames;
+  }
+
   public List<Symbol> getOutputSymbols() {
     return outputs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 6dbb8c5d70a..7520a235ac8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -4,6 +4,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -22,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class TableScanNode extends PlanNode {
+public class TableScanNode extends SourceNode {
 
   // db.tablename
   private final String qualifiedTableName;
@@ -133,6 +134,10 @@ public class TableScanNode extends PlanNode {
     return this.idAndAttributeIndexMap;
   }
 
+  public void setIdAndAttributeIndexMap(Map<Symbol, Integer> 
idAndAttributeIndexMap) {
+    this.idAndAttributeIndexMap = idAndAttributeIndexMap;
+  }
+
   public Map<Symbol, ColumnSchema> getAssignments() {
     return this.assignments;
   }
@@ -161,7 +166,18 @@ public class TableScanNode extends PlanNode {
     return this.regionReplicaSet;
   }
 
+  @Override
+  public void open() throws Exception {
+
+  }
+
+  @Override
   public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
     this.regionReplicaSet = regionReplicaSet;
   }
+
+  @Override
+  public void close() throws Exception {
+
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index 55c9218d943..aa738291ced 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -106,15 +106,16 @@ public class IndexScan implements RelationalPlanOptimizer 
{
       node.setDeviceEntries(deviceEntries);
 
       // TODO getDataPartition, Change globalTimeFilter to Filter
+      String database = "root." + 
context.getSessionInfo().getDatabaseName().get();
       IPartitionFetcher partitionFetcher = 
ClusterPartitionFetcher.getInstance();
       Filter globalTimeFilter = null;
       Set<String> deviceSet = new HashSet<>();
       for (DeviceEntry deviceEntry : deviceEntries) {
         StringArrayDeviceID arrayDeviceID = (StringArrayDeviceID) 
deviceEntry.getDeviceID();
         String device = arrayDeviceID.toString();
-        deviceSet.add(device);
+        deviceSet.add("root."+device);
       }
-      String database = "root." + 
context.getSessionInfo().getDatabaseName().get();
+
       DataPartition dataPartition =
           fetchDataPartitionByDevices(deviceSet, database, globalTimeFilter, 
partitionFetcher);
       context.getAnalysis().setDataPartition(dataPartition);

Reply via email to