zhuzhurk commented on code in PR #21765: URL: https://github.com/apache/flink/pull/21765#discussion_r1089112017
########## flink-core/src/main/java/org/apache/flink/api/common/SupportsConcurrentExecutionAttempts.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The interface indicates that it supports multiple attempts executing at the same time. + * + * <p>Currently, the interface is used for speculative execution. If a sink implementation (SinkV2, + * OutputFormat or SinkFunction) inherit this interface, the sink operator would be considered to Review Comment: inherit -> inherits ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1389,27 +1388,6 @@ && isChainable(upStreamVertex.getInEdges().get(0), streamGraph)) { return upStreamVertex.getOperatorFactory(); } - private void markContainsSourcesOrSinks() { - for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { - final JobVertex jobVertex = entry.getValue(); - final Set<Integer> vertexOperators = new HashSet<>(); - vertexOperators.add(entry.getKey()); - if (chainedConfigs.containsKey(entry.getKey())) { - vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet()); - } - - for (int nodeId : vertexOperators) { - if (streamGraph.getSourceIDs().contains(nodeId)) { - jobVertex.markContainsSources(); - } - if (streamGraph.getSinkIDs().contains(nodeId) Review Comment: This is for legacy sinks. Therefore I think this change should be applied after SinkFunction/OutputFormat speculative execution is supported. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ########## @@ -149,11 +149,8 @@ public class JobVertex implements java.io.Serializable { */ private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>(); - /** Indicates whether this job vertex contains source operators. */ - private boolean containsSourceOperators = false; - - /** Indicates whether this job vertex contains sink operators. */ - private boolean containsSinkOperators = false; + /** Indicates whether this job vertex supports attempts executing at the same time. */ Review Comment: supports attempts executing -> supports multiple attempts of the same subtask executing ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -1066,4 +1050,12 @@ public void registerJobStatusHook(JobStatusHook hook) { public List<JobStatusHook> getJobStatusHooks() { return this.jobStatusHooks; } + + public void setSupportsConcurrentExecutionAttempts( + Integer vertexID, boolean supportsConcurrentExecutionAttempts) { Review Comment: vertexID -> vertexId This is recommended according to the naming convention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
