This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce43b7f  [FLINK-20673][table-planner-blink] ExecNode#getOutputType 
method should return LogicalType instead of RowType
ce43b7f is described below

commit ce43b7fe0ff24a7b8acdd5ac4d2ce9cc68476f09
Author: godfrey he <[email protected]>
AuthorDate: Sun Dec 20 23:44:11 2020 +0800

    [FLINK-20673][table-planner-blink] ExecNode#getOutputType method should 
return LogicalType instead of RowType
    
    This closes #14423
---
 .../apache/flink/table/planner/plan/nodes/exec/ExecNode.java  | 11 +++++++++--
 .../flink/table/planner/plan/nodes/exec/ExecNodeBase.java     |  8 ++++----
 .../planner/plan/nodes/exec/batch/BatchExecExchange.java      |  5 +++--
 .../table/planner/plan/nodes/exec/batch/BatchExecNode.java    |  4 ++--
 .../exec/processor/utils/InputPriorityConflictResolver.java   |  5 +++--
 .../table/planner/plan/nodes/exec/stream/StreamExecNode.java  |  4 ++--
 .../table/planner/plan/nodes/exec/LegacyExecNodeBase.scala    |  4 ++--
 .../processor/utils/InputPriorityConflictResolverTest.java    |  3 ++-
 .../table/planner/plan/nodes/exec/TestingBatchExecNode.scala  |  4 ++--
 9 files changed, 29 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 12cff6e..536b0a6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -19,9 +19,11 @@
 package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.List;
@@ -41,9 +43,14 @@ public interface ExecNode<T> {
        String getDesc();
 
        /**
-        * Returns the output {@link RowType} of this node.
+        * Returns the output {@link LogicalType} of this node,
+        * this type should be consistent with the type parameter {@link T}.
+        *
+        * <p>Such as, if T is {@link RowData}, the output type should be 
{@link RowType}.
+        * please refer to the JavaDoc of {@link RowData} for more info about
+        * mapping of logical types to internal data structures.
         */
-       RowType getOutputType();
+       LogicalType getOutputType();
 
        /**
         * Returns a list of this node's input nodes.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 527b33d..c189530 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,7 +39,7 @@ public abstract class ExecNodeBase<P extends Planner, T> 
implements ExecNode<T>
 
        private final String description;
        private final List<ExecEdge> inputEdges;
-       private final RowType outputType;
+       private final LogicalType outputType;
        // TODO remove this field once edge support `source` and `target`,
        //  and then we can get/set `inputNodes` through `inputEdges`.
        private List<ExecNode<?>> inputNodes;
@@ -48,7 +48,7 @@ public abstract class ExecNodeBase<P extends Planner, T> 
implements ExecNode<T>
 
        protected ExecNodeBase(
                        List<ExecEdge> inputEdges,
-                       RowType outputType,
+                       LogicalType outputType,
                        String description) {
                this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
                this.outputType = checkNotNull(outputType);
@@ -61,7 +61,7 @@ public abstract class ExecNodeBase<P extends Planner, T> 
implements ExecNode<T>
        }
 
        @Override
-       public RowType getOutputType() {
+       public LogicalType getOutputType() {
                return outputType;
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
index 597f804..9b889a6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
@@ -77,7 +77,7 @@ public class BatchExecExchange extends BatchExecNode<RowData> 
implements CommonE
                }
                sb.append("distribution=[").append(type);
                if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) {
-                       RowType inputRowType = 
getInputNodes().get(0).getOutputType();
+                       RowType inputRowType = (RowType) 
getInputNodes().get(0).getOutputType();
                        String[] fieldNames = 
Arrays.stream(requiredShuffle.getKeys())
                                        .mapToObj(i -> 
inputRowType.getFieldNames().get(i))
                                        .toArray(String[]::new);
@@ -115,8 +115,9 @@ public class BatchExecExchange extends 
BatchExecNode<RowData> implements CommonE
                                break;
                        case HASH:
                                int[] keys = 
inputEdge.getRequiredShuffle().getKeys();
+                               RowType inputType = (RowType) 
inputNode.getOutputType();
                                String[] fieldNames = Arrays.stream(keys)
-                                               .mapToObj(i -> 
inputNode.getOutputType().getFieldNames().get(i))
+                                               .mapToObj(i -> 
inputType.getFieldNames().get(i))
                                                .toArray(String[]::new);
                                partitioner = new BinaryHashPartitioner(
                                                
HashCodeGenerator.generateRowHash(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
index 004ea6f..c9be68b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.delegation.BatchPlanner;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.List;
 
@@ -32,7 +32,7 @@ import java.util.List;
 public abstract class BatchExecNode<T> extends ExecNodeBase<BatchPlanner, T> {
        public BatchExecNode(
                        List<ExecEdge> inputEdges,
-                       RowType outputType,
+                       LogicalType outputType,
                        String description) {
                super(inputEdges, outputType, description);
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index e7a811d..1647db5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
 import 
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
 import java.util.List;
@@ -80,7 +81,7 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
                                // we should split it into two nodes
                                BatchExecExchange newExchange = new 
BatchExecExchange(
                                        execEdge,
-                                       exchange.getOutputType(),
+                                       (RowType) exchange.getOutputType(),
                                        "Exchange");
                                newExchange.setRequiredShuffleMode(shuffleMode);
                                
newExchange.setInputNodes(exchange.getInputNodes());
@@ -120,7 +121,7 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
                                .build();
                BatchExecExchange exchange = new BatchExecExchange(
                                execEdge,
-                               inputNode.getOutputType(),
+                               (RowType) inputNode.getOutputType(),
                                "Exchange");
                exchange.setInputNodes(Collections.singletonList(inputNode));
                exchange.setRequiredShuffleMode(shuffleMode);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
index 361415e..b3e4b74 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.planner.delegation.StreamPlanner;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.List;
 
@@ -32,7 +32,7 @@ import java.util.List;
 public abstract class StreamExecNode<T> extends ExecNodeBase<StreamPlanner, T> 
{
        public StreamExecNode(
                        List<ExecEdge> inputEdges,
-                       RowType outputType,
+                       LogicalType outputType,
                        String description) {
                super(inputEdges, outputType, description);
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
index b9612be..bf35770 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.delegation.Planner
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.util.Preconditions.{checkArgument, checkNotNull}
 
 import org.apache.calcite.rel.RelDistribution
@@ -58,7 +58,7 @@ trait LegacyExecNodeBase[P <: Planner, T] extends ExecNode[T] 
{
     this.asInstanceOf[FlinkPhysicalRel].getRelDetailedDescription
   }
 
-  override def getOutputType: RowType = {
+  override def getOutputType: LogicalType = {
     
FlinkTypeFactory.toLogicalRowType(this.asInstanceOf[FlinkPhysicalRel].getRowType)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index a4eb4ac..4098c30 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -104,7 +105,7 @@ public class InputPriorityConflictResolverTest {
 
                BatchExecExchange exchange = new BatchExecExchange(
                        
ExecEdge.builder().requiredShuffle(ExecEdge.RequiredShuffle.any()).build(),
-                       nodes[0].getOutputType(),
+                       (RowType) nodes[0].getOutputType(),
                        "Exchange");
                exchange.setRequiredShuffleMode(ShuffleMode.BATCH);
                exchange.setInputNodes(Collections.singletonList(nodes[0]));
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
index ba37461..8c2ce5b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.planner.calcite.{FlinkContextImpl, 
FlinkRelOptClusterFactory, FlinkRexBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel
-import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.hep.{HepPlanner, HepProgram}
 import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner, RelTraitSet}
@@ -52,7 +52,7 @@ class TestingBatchExecNode
     inputEdges.add(edge)
   }
 
-  override def getOutputType: RowType = RowType.of()
+  override def getOutputType: LogicalType = RowType.of()
 
   override def getInputNodes: util.List[ExecNode[_]] = inputNodes
 

Reply via email to