This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ce43b7f [FLINK-20673][table-planner-blink] ExecNode#getOutputType
method should return LogicalType instead of RowType
ce43b7f is described below
commit ce43b7fe0ff24a7b8acdd5ac4d2ce9cc68476f09
Author: godfrey he <[email protected]>
AuthorDate: Sun Dec 20 23:44:11 2020 +0800
[FLINK-20673][table-planner-blink] ExecNode#getOutputType method should
return LogicalType instead of RowType
This closes #14423
---
.../apache/flink/table/planner/plan/nodes/exec/ExecNode.java | 11 +++++++++--
.../flink/table/planner/plan/nodes/exec/ExecNodeBase.java | 8 ++++----
.../planner/plan/nodes/exec/batch/BatchExecExchange.java | 5 +++--
.../table/planner/plan/nodes/exec/batch/BatchExecNode.java | 4 ++--
.../exec/processor/utils/InputPriorityConflictResolver.java | 5 +++--
.../table/planner/plan/nodes/exec/stream/StreamExecNode.java | 4 ++--
.../table/planner/plan/nodes/exec/LegacyExecNodeBase.scala | 4 ++--
.../processor/utils/InputPriorityConflictResolverTest.java | 3 ++-
.../table/planner/plan/nodes/exec/TestingBatchExecNode.scala | 4 ++--
9 files changed, 29 insertions(+), 19 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 12cff6e..536b0a6 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -19,9 +19,11 @@
package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.List;
@@ -41,9 +43,14 @@ public interface ExecNode<T> {
String getDesc();
/**
- * Returns the output {@link RowType} of this node.
+ * Returns the output {@link LogicalType} of this node,
+ * this type should be consistent with the type parameter {@link T}.
+ *
+ * <p>Such as, if T is {@link RowData}, the output type should be
{@link RowType}.
+ * please refer to the JavaDoc of {@link RowData} for more info about
+ * mapping of logical types to internal data structures.
*/
- RowType getOutputType();
+ LogicalType getOutputType();
/**
* Returns a list of this node's input nodes.
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 527b33d..c189530 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.ArrayList;
import java.util.List;
@@ -39,7 +39,7 @@ public abstract class ExecNodeBase<P extends Planner, T>
implements ExecNode<T>
private final String description;
private final List<ExecEdge> inputEdges;
- private final RowType outputType;
+ private final LogicalType outputType;
// TODO remove this field once edge support `source` and `target`,
// and then we can get/set `inputNodes` through `inputEdges`.
private List<ExecNode<?>> inputNodes;
@@ -48,7 +48,7 @@ public abstract class ExecNodeBase<P extends Planner, T>
implements ExecNode<T>
protected ExecNodeBase(
List<ExecEdge> inputEdges,
- RowType outputType,
+ LogicalType outputType,
String description) {
this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
this.outputType = checkNotNull(outputType);
@@ -61,7 +61,7 @@ public abstract class ExecNodeBase<P extends Planner, T>
implements ExecNode<T>
}
@Override
- public RowType getOutputType() {
+ public LogicalType getOutputType() {
return outputType;
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
index 597f804..9b889a6 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
@@ -77,7 +77,7 @@ public class BatchExecExchange extends BatchExecNode<RowData>
implements CommonE
}
sb.append("distribution=[").append(type);
if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) {
- RowType inputRowType =
getInputNodes().get(0).getOutputType();
+ RowType inputRowType = (RowType)
getInputNodes().get(0).getOutputType();
String[] fieldNames =
Arrays.stream(requiredShuffle.getKeys())
.mapToObj(i ->
inputRowType.getFieldNames().get(i))
.toArray(String[]::new);
@@ -115,8 +115,9 @@ public class BatchExecExchange extends
BatchExecNode<RowData> implements CommonE
break;
case HASH:
int[] keys =
inputEdge.getRequiredShuffle().getKeys();
+ RowType inputType = (RowType)
inputNode.getOutputType();
String[] fieldNames = Arrays.stream(keys)
- .mapToObj(i ->
inputNode.getOutputType().getFieldNames().get(i))
+ .mapToObj(i ->
inputType.getFieldNames().get(i))
.toArray(String[]::new);
partitioner = new BinaryHashPartitioner(
HashCodeGenerator.generateRowHash(
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
index 004ea6f..c9be68b 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNode.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.delegation.BatchPlanner;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
@@ -32,7 +32,7 @@ import java.util.List;
public abstract class BatchExecNode<T> extends ExecNodeBase<BatchPlanner, T> {
public BatchExecNode(
List<ExecEdge> inputEdges,
- RowType outputType,
+ LogicalType outputType,
String description) {
super(inputEdges, outputType, description);
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index e7a811d..1647db5 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -25,6 +25,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.List;
@@ -80,7 +81,7 @@ public class InputPriorityConflictResolver extends
InputPriorityGraphGenerator {
// we should split it into two nodes
BatchExecExchange newExchange = new
BatchExecExchange(
execEdge,
- exchange.getOutputType(),
+ (RowType) exchange.getOutputType(),
"Exchange");
newExchange.setRequiredShuffleMode(shuffleMode);
newExchange.setInputNodes(exchange.getInputNodes());
@@ -120,7 +121,7 @@ public class InputPriorityConflictResolver extends
InputPriorityGraphGenerator {
.build();
BatchExecExchange exchange = new BatchExecExchange(
execEdge,
- inputNode.getOutputType(),
+ (RowType) inputNode.getOutputType(),
"Exchange");
exchange.setInputNodes(Collections.singletonList(inputNode));
exchange.setRequiredShuffleMode(shuffleMode);
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
index 361415e..b3e4b74 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecNode.java
@@ -22,7 +22,7 @@ import
org.apache.flink.table.planner.delegation.StreamPlanner;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
@@ -32,7 +32,7 @@ import java.util.List;
public abstract class StreamExecNode<T> extends ExecNodeBase<StreamPlanner, T>
{
public StreamExecNode(
List<ExecEdge> inputEdges,
- RowType outputType,
+ LogicalType outputType,
String description) {
super(inputEdges, outputType, description);
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
index b9612be..bf35770 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/LegacyExecNodeBase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.delegation.Planner
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.util.Preconditions.{checkArgument, checkNotNull}
import org.apache.calcite.rel.RelDistribution
@@ -58,7 +58,7 @@ trait LegacyExecNodeBase[P <: Planner, T] extends ExecNode[T]
{
this.asInstanceOf[FlinkPhysicalRel].getRelDetailedDescription
}
- override def getOutputType: RowType = {
+ override def getOutputType: LogicalType = {
FlinkTypeFactory.toLogicalRowType(this.asInstanceOf[FlinkPhysicalRel].getRowType)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index a4eb4ac..4098c30 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;
@@ -104,7 +105,7 @@ public class InputPriorityConflictResolverTest {
BatchExecExchange exchange = new BatchExecExchange(
ExecEdge.builder().requiredShuffle(ExecEdge.RequiredShuffle.any()).build(),
- nodes[0].getOutputType(),
+ (RowType) nodes[0].getOutputType(),
"Exchange");
exchange.setRequiredShuffleMode(ShuffleMode.BATCH);
exchange.setInputNodes(Collections.singletonList(nodes[0]));
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
index ba37461..8c2ce5b 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.planner.calcite.{FlinkContextImpl,
FlinkRelOptClusterFactory, FlinkRexBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.planner.delegation.BatchPlanner
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel
-import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
import org.apache.calcite.plan.hep.{HepPlanner, HepProgram}
import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner, RelTraitSet}
@@ -52,7 +52,7 @@ class TestingBatchExecNode
inputEdges.add(edge)
}
- override def getOutputType: RowType = RowType.of()
+ override def getOutputType: LogicalType = RowType.of()
override def getInputNodes: util.List[ExecNode[_]] = inputNodes