This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d336dad93459622be2dd619cc1d8d3709118628b Author: ifndef-SleePy <[email protected]> AuthorDate: Mon Jan 23 01:18:28 2023 +0800 [FLINK-30755][runtime] Remove legacy codes of marking not support speculative executions --- .../executiongraph/SpeculativeExecutionVertex.java | 8 - .../apache/flink/runtime/jobgraph/JobVertex.java | 22 --- .../adaptivebatch/SpeculativeScheduler.java | 2 +- .../flink/streaming/api/graph/StreamGraph.java | 16 -- .../api/graph/StreamingJobGraphGenerator.java | 22 --- .../StreamingJobGraphGeneratorSourceSinkTest.java | 162 --------------------- 6 files changed, 1 insertion(+), 231 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java index 48171a81d57..dbd5f84a461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java @@ -72,14 +72,6 @@ public class SpeculativeExecutionVertex extends ExecutionVertex { this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>(); } - public boolean containsSources() { - return getJobVertex().getJobVertex().containsSources(); - } - - public boolean containsSinks() { - return getJobVertex().getJobVertex().containsSinks(); - } - public boolean isSupportsConcurrentExecutionAttempts() { return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index eb322fe72bd..442512bdd2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -149,12 +149,6 @@ public class JobVertex implements java.io.Serializable { */ private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>(); - /** Indicates whether this job vertex contains source operators. */ - private boolean containsSourceOperators = false; - - /** Indicates whether this job vertex contains sink operators. */ - private boolean containsSinkOperators = false; - /** * Indicates whether this job vertex supports multiple attempts of the same subtask executing at * the same time. @@ -543,22 +537,6 @@ public class JobVertex implements java.io.Serializable { return inputs.isEmpty(); } - public void markContainsSources() { - this.containsSourceOperators = true; - } - - public boolean containsSources() { - return containsSourceOperators; - } - - public void markContainsSinks() { - this.containsSinkOperators = true; - } - - public boolean containsSinks() { - return containsSinkOperators; - } - public void setSupportsConcurrentExecutionAttempts( boolean supportsConcurrentExecutionAttempts) { this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java index 6d7bea3e9bf..ea26f8158b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java @@ -329,7 +329,7 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId); - if (executionVertex.containsSinks()) { + if (!executionVertex.isSupportsConcurrentExecutionAttempts()) { continue; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 6b082ffd45d..636c4893178 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -118,7 +118,6 @@ public class StreamGraph implements Pipeline { private Map<Integer, StreamNode> streamNodes; private Set<Integer> sources; private Set<Integer> sinks; - private Set<Integer> expandedSinks; private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes; private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes; @@ -161,7 +160,6 @@ public class StreamGraph implements Pipeline { iterationSourceSinkPairs = new HashSet<>(); sources = new HashSet<>(); sinks = new HashSet<>(); - expandedSinks = new HashSet<>(); slotSharingGroupResources = new HashMap<>(); } @@ -373,16 +371,6 @@ public class StreamGraph implements Pipeline { sinks.add(vertexID); } - /** - * Register expanded sink nodes. These nodes should also be treated as sinks. But we do not add - * them into {@link #sinks} to avoid messing up the json plan. - * - * @param nodeIds sink nodes to register - */ - public void registerExpandedSinks(Collection<Integer> nodeIds) { - expandedSinks.addAll(nodeIds); - } - public <IN, OUT> void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @@ -895,10 +883,6 @@ public class StreamGraph implements Pipeline { return sinks; } - public Collection<Integer> getExpandedSinkIds() { - return expandedSinks; - } - public Collection<StreamNode> getStreamNodes() { return streamNodes.values(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 2e7b332cd9d..eafe8ae128e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -239,7 +239,6 @@ public class StreamingJobGraphGenerator { setPhysicalEdges(); - markContainsSourcesOrSinks(); markSupportingConcurrentExecutionAttempts(); setSlotSharingAndCoLocation(); @@ -1390,27 +1389,6 @@ public class StreamingJobGraphGenerator { return upStreamVertex.getOperatorFactory(); } - private void markContainsSourcesOrSinks() { - for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { - final JobVertex jobVertex = entry.getValue(); - final Set<Integer> vertexOperators = new HashSet<>(); - vertexOperators.add(entry.getKey()); - if (chainedConfigs.containsKey(entry.getKey())) { - vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet()); - } - - for (int nodeId : vertexOperators) { - if (streamGraph.getSourceIDs().contains(nodeId)) { - jobVertex.markContainsSources(); - } - if (streamGraph.getSinkIDs().contains(nodeId) - || streamGraph.getExpandedSinkIds().contains(nodeId)) { - jobVertex.markContainsSinks(); - } - } - } - } - private void markSupportingConcurrentExecutionAttempts() { for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { final JobVertex jobVertex = entry.getValue(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java deleted file mode 100644 index 2f59790b4e0..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.graph; - -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; -import org.apache.flink.streaming.util.TestExpandingSink; -import org.apache.flink.util.TestLoggerExtension; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Tests whether a generated job vertex is correctly marked as a source/sink by {@link - * StreamingJobGraphGenerator}. - */ -@ExtendWith(TestLoggerExtension.class) -class StreamingJobGraphGeneratorSourceSinkTest { - - private StreamExecutionEnvironment env; - - @BeforeEach - void setUp() { - env = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - @Test - void testLegacySource() { - env.fromElements(0, 1).map(i -> i); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sourceVertex = verticesSorted.get(0); - assertThat(sourceVertex.containsSources()).isTrue(); - assertThat(sourceVertex.containsSinks()).isFalse(); - } - - @Test - void testNewSource() { - env.fromSequence(0, 1).map(i -> i); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sourceVertex = verticesSorted.get(0); - assertThat(sourceVertex.containsSources()).isTrue(); - assertThat(sourceVertex.containsSinks()).isFalse(); - } - - @Test - void testMultiInputSource() { - final DataStream<Long> source1 = env.fromSequence(0, 1); - final DataStream<Long> source2 = env.fromSequence(0, 1); - final MultipleInputTransformation<Long> multiInputTransform = - new MultipleInputTransformation<>( - "multi-input-operator", - new StreamingJobGraphGeneratorTest.UnusedOperatorFactory(), - Types.LONG, - env.getParallelism()); - multiInputTransform.addInput(source1.map(i -> i).getTransformation()); - multiInputTransform.addInput(source2.getTransformation()); - multiInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES); - env.addOperator(multiInputTransform); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex source1Vertex = verticesSorted.get(0); - assertThat(source1Vertex.containsSources()).isTrue(); - assertThat(source1Vertex.containsSinks()).isFalse(); - - // source-2 is chained with the multi-input vertex - final JobVertex multiInputVertex = verticesSorted.get(1); - assertThat(multiInputVertex.containsSources()).isTrue(); - assertThat(multiInputVertex.containsSinks()).isFalse(); - } - - @Test - void testLegacySink() { - env.fromElements(0, 1).map(i -> i).startNewChain().addSink(new SinkFunction<Integer>() {}); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sinkVertex = verticesSorted.get(1); - assertThat(sinkVertex.containsSources()).isFalse(); - assertThat(sinkVertex.containsSinks()).isTrue(); - } - - @Test - void testNewSink() { - env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink()); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sinkVertex = verticesSorted.get(1); - assertThat(sinkVertex.containsSources()).isFalse(); - assertThat(sinkVertex.containsSinks()).isTrue(); - } - - @Test - void testNewSinkWithSinkTopology() { - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink()); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sinkWriterVertex = verticesSorted.get(1); - assertThat(sinkWriterVertex.containsSources()).isFalse(); - assertThat(sinkWriterVertex.containsSinks()).isTrue(); - - final JobVertex sinkCommitterVertex = verticesSorted.get(2); - assertThat(sinkCommitterVertex.containsSources()).isFalse(); - assertThat(sinkCommitterVertex.containsSinks()).isTrue(); - - final JobVertex sinkPostCommitterVertex = verticesSorted.get(3); - assertThat(sinkPostCommitterVertex.containsSources()).isFalse(); - assertThat(sinkPostCommitterVertex.containsSinks()).isTrue(); - } - - @Test - void testChainedSourceSink() { - env.setParallelism(1); - env.fromElements(0, 1).sinkTo(new TestExpandingSink()); - - final List<JobVertex> verticesSorted = getJobVertices(); - - final JobVertex sourceSinkVertex = verticesSorted.get(0); - assertThat(sourceSinkVertex.containsSources()).isTrue(); - assertThat(sourceSinkVertex.containsSinks()).isTrue(); - } - - private List<JobVertex> getJobVertices() { - final StreamGraph streamGraph = env.getStreamGraph(); - final JobGraph jobGraph = streamGraph.getJobGraph(); - return jobGraph.getVerticesSortedTopologicallyFromSources(); - } -}
