This is an automated email from the ASF dual-hosted git repository. junrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39ac9e508bd9ae62842893997b3038ad781e07df Author: JunRuiLee <[email protected]> AuthorDate: Thu Dec 12 17:20:48 2024 +0800 [FLINK-36067][runtime] Introduce StreamGraphOptimizer and StreamGraphOptimizationStrategy --- .../scheduler/adaptivebatch/OperatorsFinished.java | 57 +++++++++++ .../StreamGraphOptimizationStrategy.java | 58 +++++++++++ .../adaptivebatch/StreamGraphOptimizer.java | 111 +++++++++++++++++++++ .../adaptivebatch/StreamGraphOptimizerTest.java | 104 +++++++++++++++++++ 4 files changed, 330 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java new file mode 100644 index 00000000000..b1921904995 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents the information about the finished operators. It includes a list of + * StreamNode IDs representing the finished operators, and a map associating each finished + * StreamNode ID with their corresponding produced data size and distribution information. + */ +public class OperatorsFinished { + + /** A list that holds the IDs of the finished StreamNodes. */ + private final List<Integer> finishedStreamNodeIds; + + /** + * A map that associates each finished StreamNode ID with a list of IntermediateResultInfo + * objects. The key is the StreamNode ID, and the value is a list of IntermediateResultInfo. + */ + private final Map<Integer, List<BlockingResultInfo>> resultInfoMap; + + public OperatorsFinished( + List<Integer> finishedStreamNodeIds, + Map<Integer, List<BlockingResultInfo>> resultInfoMap) { + this.finishedStreamNodeIds = checkNotNull(finishedStreamNodeIds); + this.resultInfoMap = checkNotNull(resultInfoMap); + } + + public List<Integer> getFinishedStreamNodeIds() { + return Collections.unmodifiableList(finishedStreamNodeIds); + } + + public Map<Integer, List<BlockingResultInfo>> getResultInfoMap() { + return Collections.unmodifiableMap(resultInfoMap); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java new file mode 100644 index 00000000000..24d9f3252e0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.graph.StreamGraphContext; + +import java.util.List; + +/** + * Defines an optimization strategy for StreamGraph. Implementors of this interface provide methods + * to modify and optimize a StreamGraph based on contexts provided at runtime. + */ +@FunctionalInterface +public interface StreamGraphOptimizationStrategy { + + @Internal + ConfigOption<List<String>> STREAM_GRAPH_OPTIMIZATION_STRATEGY = + ConfigOptions.key("execution.batch.adaptive.stream-graph-optimization.strategies") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "Defines a comma-separated list of fully qualified class names " + + "implementing the StreamGraphOptimizationStrategy interface."); + + /** + * Tries to optimize the StreamGraph using the provided {@link OperatorsFinished} and {@link + * StreamGraphContext}. The method returns a boolean indicating whether the StreamGraph was + * successfully optimized. + * + * @param operatorsFinished the OperatorsFinished object containing information about completed + * operators and their produced data size and distribution information. + * @param context the StreamGraphContext with a read-only view of a StreamGraph, providing + * methods to modify StreamEdges and StreamNodes within the StreamGraph. + * @return {@code true} if the StreamGraph was successfully optimized; {@code false} otherwise. + */ + boolean onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext context) + throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java new file mode 100644 index 00000000000..1ac7583d6f2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.graph.StreamGraphContext; +import org.apache.flink.util.DynamicCodeLoadingException; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@code StreamGraphOptimizer} class is responsible for optimizing a StreamGraph based on + * runtime information. + * + * <p>Upon initialization, it obtains a {@code StreamGraphContext} from the {@code + * AdaptiveGraphManager} and loads the specified optimization strategies. At runtime, it applies + * these strategies sequentially to the StreamGraph using the provided context and information about + * finished operators. + */ +public class StreamGraphOptimizer { + + private final List<StreamGraphOptimizationStrategy> optimizationStrategies; + + public StreamGraphOptimizer(Configuration jobConfiguration, ClassLoader userClassLoader) + throws DynamicCodeLoadingException { + checkNotNull(jobConfiguration); + + Optional<List<String>> optional = + jobConfiguration.getOptional( + StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY); + if (optional.isPresent()) { + optimizationStrategies = loadOptimizationStrategies(optional.get(), userClassLoader); + } else { + optimizationStrategies = new ArrayList<>(); + } + } + + /** + * Applies all loaded optimization strategies to the StreamGraph. + * + * @param operatorsFinished the object containing information about finished operators. + * @param context the StreamGraphContext providing methods to modify the StreamGraph. + */ + public void onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext context) + throws Exception { + for (StreamGraphOptimizationStrategy strategy : optimizationStrategies) { + strategy.onOperatorsFinished(operatorsFinished, context); + } + } + + private List<StreamGraphOptimizationStrategy> loadOptimizationStrategies( + List<String> strategyClassNames, ClassLoader userClassLoader) + throws DynamicCodeLoadingException { + List<StreamGraphOptimizationStrategy> strategies = + new ArrayList<>(strategyClassNames.size()); + + for (String strategyClassName : strategyClassNames) { + strategies.add(loadOptimizationStrategy(strategyClassName, userClassLoader)); + } + + return strategies; + } + + private StreamGraphOptimizationStrategy loadOptimizationStrategy( + String strategyClassName, ClassLoader userClassLoader) + throws DynamicCodeLoadingException { + try { + Class<? extends StreamGraphOptimizationStrategy> clazz = + Class.forName(strategyClassName, false, userClassLoader) + .asSubclass(StreamGraphOptimizationStrategy.class); + + return clazz.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured stream graph optimization strategy class: " + + strategyClassName, + e); + } catch (ClassCastException + | InstantiationException + | IllegalAccessException + | NoSuchMethodException + | InvocationTargetException e) { + throw new DynamicCodeLoadingException( + "The configured class '" + + strategyClassName + + "' is not a valid stream graph optimization strategy", + e); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java new file mode 100644 index 00000000000..2f063db1642 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.graph.StreamGraphContext; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph; +import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link StreamGraphOptimizer}. */ +class StreamGraphOptimizerTest { + private Configuration jobConfiguration; + private ClassLoader userClassLoader; + + @BeforeEach + void setUp() { + jobConfiguration = new Configuration(); + userClassLoader = Thread.currentThread().getContextClassLoader(); + } + + @Test + void testOnOperatorsFinished() throws Exception { + List<String> strategyClassNames = + List.of(TestingStreamGraphOptimizerStrategy.class.getName()); + jobConfiguration.set( + StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY, + strategyClassNames); + + StreamGraphOptimizer optimizer = + new StreamGraphOptimizer(jobConfiguration, userClassLoader); + + OperatorsFinished operatorsFinished = + new OperatorsFinished(new ArrayList<>(), new HashMap<>()); + StreamGraphContext context = + new StreamGraphContext() { + @Override + public ImmutableStreamGraph getStreamGraph() { + return null; + } + + @Override + public @Nullable StreamOperatorFactory<?> getOperatorFactory( + Integer streamNodeId) { + return null; + } + + @Override + public boolean modifyStreamEdge( + List<StreamEdgeUpdateRequestInfo> requestInfos) { + return false; + } + }; + + optimizer.onOperatorsFinished(operatorsFinished, context); + + assertThat(TestingStreamGraphOptimizerStrategy.collectedOperatorsFinished) + .isEqualTo(operatorsFinished); + assertThat(TestingStreamGraphOptimizerStrategy.collectedStreamGraphContext) + .isEqualTo(context); + } + + protected static final class TestingStreamGraphOptimizerStrategy + implements StreamGraphOptimizationStrategy { + + private static OperatorsFinished collectedOperatorsFinished; + private static StreamGraphContext collectedStreamGraphContext; + + @Override + public boolean onOperatorsFinished( + OperatorsFinished operatorsFinished, StreamGraphContext context) { + collectedOperatorsFinished = operatorsFinished; + collectedStreamGraphContext = context; + return true; + } + } +}
