dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r502253641



##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -513,6 +514,12 @@ public long getBufferTimeout() {
         */
        public abstract Collection<Transformation<?>> 
getTransitivePredecessors();
 
+       /**
+        * Returns the {@link Transformation transformations} that are the
+        * immediate predecessors of the current transformation in the 
transformation graph.
+        */
+       public abstract List<Transformation<?>> getInputs();

Review comment:
       very nitpicking: How about we unify the return type of 
`getTransitivePredecessors` & `getInputs` ?

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
##########
@@ -445,9 +443,10 @@ public void testParallelismOnLimitPushDown() {
                RelNode relNode = 
planner.optimize(TableTestUtil.toRelNode(table));
                ExecNode execNode = 
planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
                @SuppressWarnings("unchecked")
-               Transformation transformation = 
execNode.translateToPlan(planner);
-               Assert.assertEquals(1, ((PartitionTransformation) 
((OneInputTransformation) transformation).getInput())
-                       .getInput().getParallelism());
+               Transformation transformation = (Transformation) 
((Transformation) 
execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);

Review comment:
       This looks extremely weird.
   
   How about:
   ```
                @SuppressWarnings("unchecked")
                ExecNode<PlannerBase, ?> execNode = (ExecNode<PlannerBase, ?>) 
planner.translateToExecNodePlan(toScala(
                        Collections.singletonList(relNode))).get(0);
                Transformation<?> transformation = 
(execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -127,6 +129,16 @@
 
        private RuntimeExecutionMode runtimeExecutionMode = 
RuntimeExecutionMode.STREAMING;
 
+       private boolean shouldExecuteInBatchMode;
+
+       private static final Map<Class<? extends Transformation>, 
StreamGraphTranslator<?, ? extends Transformation>> translatorMap;

Review comment:
       Could we add suppressions here?
   
   ```
        @SuppressWarnings("rawtypes")
        private static final Map<Class<? extends Transformation>, 
StreamGraphTranslator<?, ? extends Transformation>> translatorMap;
   
        static {
                @SuppressWarnings("rawtypes")
                Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? 
extends Transformation>> tmp = new HashMap<>();
                tmp.put(OneInputTransformation.class, new 
OneInputTranslator<>());
                translatorMap = Collections.unmodifiableMap(tmp);
        }
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -276,10 +289,28 @@ private boolean isUnboundedSource(final Transformation<?> 
transformation) {
                // call at least once to trigger exceptions about 
MissingTypeInfo
                transform.getOutputType();
 
+               final StreamGraphTranslator<?, Transformation<?>> translator =

Review comment:
       add suppression




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to