twalthr commented on a change in pull request #18621:
URL: https://github.com/apache/flink/pull/18621#discussion_r800422757



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -113,6 +113,11 @@ public Integer getVersion() {
         return version;
     }
 
+    /** Returns a new {@code uid} for transformations. */
+    public String generateUid(String operatorName) {

Review comment:
       enforce a format here, always lower case, with `-` as delimiters. also 
give guidelines in the JavaDocs of this method.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -157,6 +158,10 @@ protected boolean inputsContainSingleton() {
                                                 == 
InputProperty.DistributionType.SINGLETON);
     }
 
+    public String getOperatorUid(String operatorName) {

Review comment:
       I find the naming of these methods here confusing. `getOperatorName` is 
actually a `getExecNodeNameForSingleOperator`. Where `getOperatorUid` is rather 
a "operator" name generator.
   
   How about the following refactorings:
   
   `getOperatorName` -> `asOperatorName`
   `getOperatorDescription` -> `asOperatorDescription`
   `getOperatorUid` -> `createOperatorUid`
   `getOperatorMeta` -> `createOperatorMetadata`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -433,20 +445,26 @@ private int deriveSinkParallelism(
             int rowtimeFieldIndex,
             int sinkParallelism,
             Configuration config) {
-        String sinkName = getOperatorName(config);
+        TransformationMetadata sinkMeta = getOperatorMeta(SINK_OPERATOR, 
config);
         String sinkDescription = getOperatorDescription(config);
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             Transformation<RowData> sinkTransformation =
                     applyRowtimeTransformation(
                             inputTransform, rowtimeFieldIndex, 
sinkParallelism, config);
             final DataStream<RowData> dataStream = new DataStream<>(env, 
sinkTransformation);
             final DataStreamSinkProvider provider = (DataStreamSinkProvider) 
runtimeProvider;
-            return provider.consumeDataStream(dataStream).getTransformation();
+            return provider.consumeDataStream(dataStream)
+                    .uid(sinkMeta.getUid())
+                    .getTransformation();
         } else if (runtimeProvider instanceof TransformationSinkProvider) {
             final TransformationSinkProvider provider =
                     (TransformationSinkProvider) runtimeProvider;
-            return provider.createTransformation(
-                    TransformationSinkProvider.Context.of(inputTransform, 
rowtimeFieldIndex));
+            Transformation<?> transformation =
+                    provider.createTransformation(
+                            TransformationSinkProvider.Context.of(
+                                    inputTransform, rowtimeFieldIndex));
+            transformation.setUid(sinkMeta.getUid());

Review comment:
       let's skip this for now. we are not in control of the number of 
transformations that make up a sink. it is not guaranteed that the last 
transformation is the sink.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCalc.java
##########
@@ -49,6 +49,9 @@
 /** Base class for exec Calc. */
 public abstract class CommonExecCalc extends ExecNodeBase<RowData>
         implements SingleTransformationTranslator<RowData> {
+
+    public static final String SUBSTITUTE_STREAM_OPERATOR = 
"substitute-stream";

Review comment:
       `substitute-stream` -> `substitute`? the name doesn't make much sense 
btw, maybe just `calc`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDataStreamScan.java
##########
@@ -55,6 +55,9 @@
 /** Stream {@link ExecNode} to connect a given {@link DataStream} and consume 
data from it. */
 public class StreamExecDataStreamScan extends ExecNodeBase<RowData>
         implements StreamExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
+
+    private static final String DATA_STREAM_SOURCE_OPERATOR = 
"data-stream-source";

Review comment:
       ExecNodes without an annotation can be skipped. Why does this uid 
generation actually work? What is the uid in this case?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -433,20 +445,26 @@ private int deriveSinkParallelism(
             int rowtimeFieldIndex,
             int sinkParallelism,
             Configuration config) {
-        String sinkName = getOperatorName(config);
+        TransformationMetadata sinkMeta = getOperatorMeta(SINK_OPERATOR, 
config);
         String sinkDescription = getOperatorDescription(config);
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             Transformation<RowData> sinkTransformation =
                     applyRowtimeTransformation(
                             inputTransform, rowtimeFieldIndex, 
sinkParallelism, config);
             final DataStream<RowData> dataStream = new DataStream<>(env, 
sinkTransformation);
             final DataStreamSinkProvider provider = (DataStreamSinkProvider) 
runtimeProvider;
-            return provider.consumeDataStream(dataStream).getTransformation();
+            return provider.consumeDataStream(dataStream)
+                    .uid(sinkMeta.getUid())

Review comment:
       let's skip this for now. we are not in control of the number of 
transformations that make up a sink. it is not guaranteed that the last 
transformation is the sink.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -178,6 +183,17 @@ protected String getOperatorDescription(Configuration 
config) {
         return getFormattedOperatorDescription(getDescription(), config);
     }
 
+    public TransformationMetadata getOperatorMeta(String operatorName, 
TableConfig config) {
+        return getOperatorMeta(operatorName, config.getConfiguration());
+    }
+
+    public TransformationMetadata getOperatorMeta(String operatorName, 
Configuration config) {

Review comment:
       Naming: `TransformationMetadata` -> `OperatorMetadata`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecUnion.java
##########
@@ -39,6 +39,7 @@
 @ExecNodeMetadata(
         name = "stream-exec-union",
         version = 1,
+        producedOperators = CommonExecUnion.UNION_OPERATOR,

Review comment:
       not sure if it is a good idea to reference an operator in `Common`, 
refactoring could potentially modify metadata across nodes. let's make the name 
configurable and add the constant to this class.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
##########
@@ -74,12 +74,15 @@
 @ExecNodeMetadata(
         name = "stream-exec-global-window-aggregate",
         version = 1,
+        producedOperators = 
StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_OPERATOR,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
 public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBase {
 
     public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE = 
"localAggInputRowType";
 
+    public static final String GLOBAL_WINDOW_AGGREGATE_OPERATOR = 
"global-window-aggregate";

Review comment:
       at other locations, this constant is the top variable

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/TransformationMetadata.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.api.dag.Transformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * This POJO is meant to hold some metadata information about operators, which 
usually needs to be
+ * passed to "factory" methods for {@link Transformation}.
+ */
+public class TransformationMetadata {
+    private final @Nullable String uid;
+    private final String name;
+    private final String desc;
+
+    public TransformationMetadata(@Nullable String uid, String name, String 
desc) {

Review comment:
       Explain when this is null and why. Maybe use some static method to 
indicate the usages e.g. like `TransformationMetadata.forBatch(...)`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
##########
@@ -267,26 +284,39 @@ public StreamExecIntervalJoin(
                         new StreamMap<>(rightPadder),
                         returnTypeInfo,
                         rightParallelism);
+        padRightStream.setUid(getOperatorUid(PAD_RIGHT_OPERATOR));
         padRightStream.setDescription(
                 getFormattedOperatorDescription("pad right input 
transformation", config));
         padRightStream.setName(
                 getFormattedOperatorName(padRightStream.getDescription(), 
"PadRight", config));
 
+        Transformation<RowData> transformation;
         switch (joinSpec.getJoinType()) {
             case INNER:
-                return new UnionTransformation<>(
-                        Lists.newArrayList(filterAllLeftStream, 
filterAllRightStream));
+                transformation =
+                        new UnionTransformation<>(
+                                Lists.newArrayList(filterAllLeftStream, 
filterAllRightStream));
+                break;
             case LEFT:
-                return new UnionTransformation<>(
-                        Lists.newArrayList(padLeftStream, 
filterAllRightStream));
+                transformation =
+                        new UnionTransformation<>(
+                                Lists.newArrayList(padLeftStream, 
filterAllRightStream));
+                break;
             case RIGHT:
-                return new UnionTransformation<>(
-                        Lists.newArrayList(filterAllLeftStream, 
padRightStream));
+                transformation =
+                        new UnionTransformation<>(
+                                Lists.newArrayList(filterAllLeftStream, 
padRightStream));
+                break;
             case FULL:
-                return new 
UnionTransformation<>(Lists.newArrayList(padLeftStream, padRightStream));
+                transformation =
+                        new UnionTransformation<>(
+                                Lists.newArrayList(padLeftStream, 
padRightStream));
+                break;
             default:
                 throw new TableException("should no reach here");
         }
+        transformation.setUid(getOperatorUid(INTERVAL_JOIN_OPERATOR));

Review comment:
       we should choose a different name here. `INTERVAL_JOIN_OPERATOR` is used 
for different operators/state layouts. I'm not even sure if a uid set on a 
union has any effect, I think it is dropped at later stages in 
`StreamGraphGenerator`.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
##########
@@ -50,6 +50,8 @@
 public abstract class CommonExecWindowTableFunction extends 
ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String WINDOW_TVF_OPERATOR = "window-tvf";

Review comment:
       I'm strongly against the name `TVF`. `window` should be enough.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
##########
@@ -58,12 +58,16 @@
 /** Stream {@link ExecNode} for unbounded python group table aggregate. */
 public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData>
         implements StreamExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
+
     private static final Logger LOG =
             LoggerFactory.getLogger(StreamExecPythonGroupTableAggregate.class);
 
+    private static final String GROUP_TABLE_AGGREGATE_OPERATOR = 
"group-table-aggregate";

Review comment:
       class is not annotated, calling `getOperatorMeta` should not even work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to