pnowojski commented on a change in pull request #8742: [FLINK-11879] Add 
validators for the uses of InputSelectable, BoundedOneInput and 
BoundedMultiInput
URL: https://github.com/apache/flink/pull/8742#discussion_r301507241
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##########
 @@ -661,18 +664,31 @@ private void configureCheckpointing() {
                                "exactly-once or at-least-once.");
                }
 
-               //  --- configure the master-side checkpoint hooks ---
+               //  --- configure the master-side checkpoint hooks and check 
the types of operators---
 
                final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new 
ArrayList<>();
 
                for (StreamNode node : streamGraph.getStreamNodes()) {
-                       if (node.getOperatorFactory() instanceof 
UdfStreamOperatorFactory) {
-                               Function f = ((UdfStreamOperatorFactory) 
node.getOperatorFactory()).getUserFunction();
+                       StreamOperatorFactory operatorFactory = 
node.getOperatorFactory();
+                       if (operatorFactory instanceof 
UdfStreamOperatorFactory) {
+                               Function f = ((UdfStreamOperatorFactory) 
operatorFactory).getUserFunction();
 
                                if (f instanceof WithMasterCheckpointHook) {
                                        hooks.add(new 
FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
                                }
                        }
+
+                       if (cfg.isCheckpointingEnabled() && operatorFactory != 
null) {
 
 Review comment:
   I agree with @1u0. This method is definitely already too big if we want to 
add some new functionality here, it must be decomposed into smaller pieces. But 
I also agree that this check would be better to perform earlier. Maybe in some 
`private void  StreamingJobGraphGenerator#validateJobGraph()` method (together 
with the present validation that happens in `StreamGraph#getJobGraph`). 
   
   With this `validateJobGraph()` either we could call it pre-emptively as a 
first thing in `createJobGraph()` or expect a user to call it manually before 
`createJobGraph()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to