pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280466736
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ##########
 @@ -480,13 +483,17 @@ private StreamGraph 
generateInternal(List<StreamTransformation<?>> transformatio
                streamGraph.addSource(source.getId(),
                                slotSharingGroup,
                                source.getCoLocationGroupKey(),
-                               source.getOperator(),
+                               source.getOperatorFactory(),
                                null,
                                source.getOutputType(),
                                "Source: " + source.getName());
-               if (source.getOperator().getUserFunction() instanceof 
InputFormatSourceFunction) {
-                       InputFormatSourceFunction<T> fs = 
(InputFormatSourceFunction<T>) source.getOperator().getUserFunction();
-                       streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
+               if (source.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   Possible issue: this code doesn't support setting input formats for non 
`SimpleOperatorFactories`. By implementing this check this way, we are 
supporting all of the present cases, but it makes it kind of strange, that for 
the new way we don't support it - we do not have a migration path to get rid of 
`StreamOperators` from the `StreamTransformation`.
   
   Could this if check be reworked to something like:
   ```
   if (source.getOperatorFactory() insnaceof InputFormatSourceOperatorFactory) {
     streamGraph.setInputFormat(id, ((InputFormatSourceOperator) 
source.getOperatorFactory()).getFormat());
   }
   ```
   ?
   Combination of `SimpleOperatorFactory` and 
`InputFormatSourceOperatorFactory` could implement `getFormat() { return 
this.getOperator().getUserFunction().getFormat()`.
   
   while future non `SimpleOperatorFactory` could be supported as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to