ifndef-SleePy commented on code in PR #21765:
URL: https://github.com/apache/flink/pull/21765#discussion_r1089199472


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1389,27 +1388,6 @@ && isChainable(upStreamVertex.getInEdges().get(0), 
streamGraph)) {
         return upStreamVertex.getOperatorFactory();
     }
 
-    private void markContainsSourcesOrSinks() {
-        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
-            final JobVertex jobVertex = entry.getValue();
-            final Set<Integer> vertexOperators = new HashSet<>();
-            vertexOperators.add(entry.getKey());
-            if (chainedConfigs.containsKey(entry.getKey())) {
-                
vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet());
-            }
-
-            for (int nodeId : vertexOperators) {
-                if (streamGraph.getSourceIDs().contains(nodeId)) {
-                    jobVertex.markContainsSources();
-                }
-                if (streamGraph.getSinkIDs().contains(nodeId)

Review Comment:
   Yes, you are right. I think there is another easier way to do this. We can 
disable speculative execution for legacy sinks in `LegacySinkTransformation`.
   By the way, I'm not sure the e2e test failure of Azure is caused by this 
issue or not. I can't run the failed e2e test case on my local laptop (it's 
probably a problem caused by my environment). I would disable speculative 
execution through `LegacySinkTransformation` to re-trigger the Azure test again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to