ifndef-SleePy commented on code in PR #21765:
URL: https://github.com/apache/flink/pull/21765#discussion_r1089199472
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1389,27 +1388,6 @@ && isChainable(upStreamVertex.getInEdges().get(0),
streamGraph)) {
return upStreamVertex.getOperatorFactory();
}
- private void markContainsSourcesOrSinks() {
- for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
- final JobVertex jobVertex = entry.getValue();
- final Set<Integer> vertexOperators = new HashSet<>();
- vertexOperators.add(entry.getKey());
- if (chainedConfigs.containsKey(entry.getKey())) {
-
vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet());
- }
-
- for (int nodeId : vertexOperators) {
- if (streamGraph.getSourceIDs().contains(nodeId)) {
- jobVertex.markContainsSources();
- }
- if (streamGraph.getSinkIDs().contains(nodeId)
Review Comment:
Yes, you are right. I think there is another easier way to do this. We can
disable speculative execution for legacy sinks in `LegacySinkTransformation`.
By the way, I'm not sure the e2e test failure of Azure is caused by this
issue or not. I would disable speculative execution through
`LegacySinkTransformation` to re-trigger the Azure test again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]