This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e3e9d6cda88e2ac7eedcd071d4e9eeb0e502544
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Apr 26 18:15:45 2024 +0800

    support SortNode, MergeSortNode and TopKNode
---
 .../process/join/merge/MergeSortComparator.java    |  20 +++
 .../plan/planner/TableOperatorGenerator.java       | 162 +++++++++++++++++++--
 .../plan/relational/planner/node/LimitNode.java    |   8 +-
 .../relational/planner/node/MergeSortNode.java     |  29 ++--
 .../plan/relational/planner/node/OffsetNode.java   |   8 +-
 .../plan/relational/planner/node/SortNode.java     |  12 +-
 .../plan/relational/planner/node/TopKNode.java     |  43 +++++-
 7 files changed, 251 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java
index c299a368e73..db6f36206f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge;
 
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.statement.component.NullOrdering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
@@ -66,6 +67,25 @@ public class MergeSortComparator {
     return list.size() == 1 ? list.get(0) : new ComparatorChain<>(list);
   }
 
+  public static Comparator<SortKey> getComparatorForTable(
+      List<SortOrder> sortOrderList, List<Integer> indexList, List<TSDataType> 
dataTypeList) {
+
+    // use code-gen compile this comparator
+    List<Comparator<SortKey>> list = new ArrayList<>(indexList.size());
+    for (int i = 0; i < indexList.size(); i++) {
+      int index = indexList.get(i);
+      if (index == -2) {
+        continue;
+      }
+      TSDataType dataType = dataTypeList.get(i);
+      SortOrder sortOrder = sortOrderList.get(i);
+      list.add(
+          genSingleComparator(sortOrder.isAscending(), index, dataType, 
sortOrder.isNullsFirst()));
+    }
+
+    return list.size() == 1 ? list.get(0) : new ComparatorChain<>(list);
+  }
+
   public static Comparator<SortKey> getComparator(TSDataType dataType, int 
index, boolean asc) {
     Comparator<SortKey> comparator;
     switch (dataType) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 767325bb302..09e7b9cae42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.planner;
 
 import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
@@ -33,6 +34,8 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProje
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
@@ -47,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -69,6 +73,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -81,6 +86,7 @@ import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
@@ -477,18 +483,6 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new OffsetOperator(operatorContext, node.getCount(), child);
   }
 
-  @Override
-  public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext 
context) {
-    OperatorContext operatorContext = 
context.getDriverContext().addOperatorContext(context.getNextOperatorId(), 
node.getPlanNodeId(),
-        MergeSortOperator.class.getSimpleName());
-    List<Operator> children = new ArrayList<>(node.getChildren().size());
-    for (PlanNode child : node.getChildren()) {
-      children.add(this.process(child, context));
-    }
-
-    return super.visitMergeSort(node, context);
-  }
-
   @Override
   public Operator visitOutput(OutputNode node, LocalExecutionPlanContext 
context) {
     TypeProvider typeProvider = context.getTypeProvider();
@@ -522,13 +516,153 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         context);
   }
 
+  @Override
+  public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext 
context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                MergeSortOperator.class.getSimpleName());
+    List<Operator> children = new ArrayList<>(node.getChildren().size());
+    for (PlanNode child : node.getChildren()) {
+      children.add(this.process(child, context));
+    }
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+    genSortInformation(
+        node.getOutputSymbols(),
+        node.getOrderingScheme(),
+        sortItemIndexList,
+        sortItemDataTypeList,
+        context.getTypeProvider());
+
+    return new MergeSortOperator(
+        operatorContext,
+        children,
+        dataTypes,
+        getComparatorForTable(
+            node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList));
+  }
+
   @Override
   public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
-    return super.visitSort(node, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SortOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+    genSortInformation(
+        node.getOutputSymbols(),
+        node.getOrderingScheme(),
+        sortItemIndexList,
+        sortItemDataTypeList,
+        context.getTypeProvider());
+
+    String filePrefix =
+        IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+            + File.separator
+            + 
operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId()
+            + File.separator
+            + operatorContext.getDriverContext().getPipelineId()
+            + File.separator;
+
+    context.getDriverContext().setHaveTmpFile(true);
+    
context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true);
+
+    Operator child = node.getChild().accept(this, context);
+
+    return new SortOperator(
+        operatorContext,
+        child,
+        dataTypes,
+        filePrefix,
+        getComparatorForTable(
+            node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList));
   }
 
   @Override
   public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) {
-    return super.visitTopK(node, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                TopKOperator.class.getSimpleName());
+    List<Operator> children = new ArrayList<>(node.getChildren().size());
+    for (PlanNode child : node.getChildren()) {
+      children.add(this.process(child, context));
+    }
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount);
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount);
+    genSortInformation(
+        node.getOutputSymbols(),
+        node.getOrderingScheme(),
+        sortItemIndexList,
+        sortItemDataTypeList,
+        context.getTypeProvider());
+    return new TopKOperator(
+        operatorContext,
+        children,
+        dataTypes,
+        getComparatorForTable(
+            node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList),
+        node.getCount(),
+        node.isChildrenDataInOrder());
+  }
+
+  private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider 
typeProvider) {
+    return node.getOutputSymbols().stream()
+        .filter(s -> 
!TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(s.getName()))
+        .map(s -> getTSDataType(typeProvider.getTableModelType(s)))
+        .collect(Collectors.toList());
+  }
+
+  private void genSortInformation(
+      List<Symbol> outputSymbols,
+      OrderingScheme orderingScheme,
+      List<Integer> sortItemIndexList,
+      List<TSDataType> sortItemDataTypeList,
+      TypeProvider typeProvider) {
+    Map<Symbol, Integer> columnIndex = new HashMap<>();
+    int index = 0;
+    for (Symbol symbol : outputSymbols) {
+      if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(symbol.getName())) {
+        continue;
+      }
+      columnIndex.put(symbol, index++);
+    }
+    orderingScheme
+        .getOrderBy()
+        .forEach(
+            sortItem -> {
+              if 
(TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getName())) {
+                sortItemIndexList.add(-1);
+                sortItemDataTypeList.add(TSDataType.INT64);
+              } else {
+                Integer i = columnIndex.get(sortItem);
+                if (i == null) {
+                  throw new IllegalStateException(
+                      "Sort Item %s is not included in children's output 
columns");
+                }
+                sortItemIndexList.add(i);
+                
sortItemDataTypeList.add(getTSDataType(typeProvider.getTableModelType(sortItem)));
+              }
+            });
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
index e05545a8f97..d3399cb1e16 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
@@ -16,6 +16,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -45,9 +46,14 @@ public class LimitNode extends SingleChildProcessNode {
     return new LimitNode(id, child, count, tiesResolvingScheme);
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLimit(this, context);
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
index 680d3812bac..68c1ffdc17e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
@@ -20,8 +20,9 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 
 import java.io.DataOutputStream;
@@ -30,25 +31,29 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 public class MergeSortNode extends MultiChildProcessNode {
-  private final OrderByParameter mergeOrderParameter;
+  private final OrderingScheme orderingScheme;
 
-  private final List<String> outputColumns;
+  private final List<Symbol> outputSymbols;
 
-  public MergeSortNode(
-      PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> 
outputColumns) {
+  public MergeSortNode(PlanNodeId id, OrderingScheme orderingScheme, 
List<Symbol> outputSymbols) {
     super(id);
-    this.mergeOrderParameter = mergeOrderParameter;
-    this.outputColumns = outputColumns;
+    this.orderingScheme = orderingScheme;
+    this.outputSymbols = outputSymbols;
   }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new MergeSortNode(getPlanNodeId(), orderingScheme, outputSymbols);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitMergeSort(this, context);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return children.get(0).getOutputColumnNames();
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -59,6 +64,10 @@ public class MergeSortNode extends MultiChildProcessNode {
 
   @Override
   public List<Symbol> getOutputSymbols() {
-    return super.getOutputSymbols();
+    return outputSymbols;
+  }
+
+  public OrderingScheme getOrderingScheme() {
+    return orderingScheme;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
index 806ff99d317..feca446b9d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
@@ -16,6 +16,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 
@@ -39,7 +40,12 @@ public class OffsetNode extends SingleChildProcessNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitOffset(this, context);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index d51b131683f..474101ddd72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -16,6 +16,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -42,7 +43,12 @@ public class SortNode extends SingleChildProcessNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSort(this, context);
   }
 
   @Override
@@ -55,4 +61,8 @@ public class SortNode extends SingleChildProcessNode {
   public List<Symbol> getOutputSymbols() {
     return child.getOutputSymbols();
   }
+
+  public OrderingScheme getOrderingScheme() {
+    return orderingScheme;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
index d356d25f1d0..b554e852933 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
@@ -21,8 +21,10 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -33,22 +35,38 @@ public class TopKNode extends MultiChildProcessNode {
 
   private final OrderingScheme orderingScheme;
 
-  private final long count;
+  private final int count;
 
-  public TopKNode(PlanNodeId id, OrderingScheme scheme, long count) {
+  private final List<Symbol> outputSymbols;
+
+  private final boolean childrenDataInOrder;
+
+  public TopKNode(
+      PlanNodeId id,
+      OrderingScheme scheme,
+      int count,
+      List<Symbol> outputSymbols,
+      boolean childrenDataInOrder) {
     super(id);
     this.orderingScheme = scheme;
     this.count = count;
+    this.outputSymbols = outputSymbols;
+    this.childrenDataInOrder = childrenDataInOrder;
   }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new TopKNode(getPlanNodeId(), orderingScheme, count, outputSymbols, 
childrenDataInOrder);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitTopK(this, context);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -56,4 +74,21 @@ public class TopKNode extends MultiChildProcessNode {
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return outputSymbols;
+  }
+
+  public OrderingScheme getOrderingScheme() {
+    return orderingScheme;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  public boolean isChildrenDataInOrder() {
+    return childrenDataInOrder;
+  }
 }

Reply via email to