This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39ac9e508bd9ae62842893997b3038ad781e07df
Author: JunRuiLee <[email protected]>
AuthorDate: Thu Dec 12 17:20:48 2024 +0800

    [FLINK-36067][runtime] Introduce StreamGraphOptimizer and 
StreamGraphOptimizationStrategy
---
 .../scheduler/adaptivebatch/OperatorsFinished.java |  57 +++++++++++
 .../StreamGraphOptimizationStrategy.java           |  58 +++++++++++
 .../adaptivebatch/StreamGraphOptimizer.java        | 111 +++++++++++++++++++++
 .../adaptivebatch/StreamGraphOptimizerTest.java    | 104 +++++++++++++++++++
 4 files changed, 330 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java
new file mode 100644
index 00000000000..b1921904995
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/OperatorsFinished.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents the information about the finished operators. It 
includes a list of
+ * StreamNode IDs representing the finished operators, and a map associating 
each finished
+ * StreamNode ID with their corresponding produced data size and distribution 
information.
+ */
+public class OperatorsFinished {
+
+    /** A list that holds the IDs of the finished StreamNodes. */
+    private final List<Integer> finishedStreamNodeIds;
+
+    /**
+     * A map that associates each finished StreamNode ID with a list of 
IntermediateResultInfo
+     * objects. The key is the StreamNode ID, and the value is a list of 
IntermediateResultInfo.
+     */
+    private final Map<Integer, List<BlockingResultInfo>> resultInfoMap;
+
+    public OperatorsFinished(
+            List<Integer> finishedStreamNodeIds,
+            Map<Integer, List<BlockingResultInfo>> resultInfoMap) {
+        this.finishedStreamNodeIds = checkNotNull(finishedStreamNodeIds);
+        this.resultInfoMap = checkNotNull(resultInfoMap);
+    }
+
+    public List<Integer> getFinishedStreamNodeIds() {
+        return Collections.unmodifiableList(finishedStreamNodeIds);
+    }
+
+    public Map<Integer, List<BlockingResultInfo>> getResultInfoMap() {
+        return Collections.unmodifiableMap(resultInfoMap);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java
new file mode 100644
index 00000000000..24d9f3252e0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizationStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+
+import java.util.List;
+
+/**
+ * Defines an optimization strategy for StreamGraph. Implementors of this 
interface provide methods
+ * to modify and optimize a StreamGraph based on contexts provided at runtime.
+ */
+@FunctionalInterface
+public interface StreamGraphOptimizationStrategy {
+
+    @Internal
+    ConfigOption<List<String>> STREAM_GRAPH_OPTIMIZATION_STRATEGY =
+            
ConfigOptions.key("execution.batch.adaptive.stream-graph-optimization.strategies")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines a comma-separated list of fully qualified 
class names "
+                                    + "implementing the 
StreamGraphOptimizationStrategy interface.");
+
+    /**
+     * Tries to optimize the StreamGraph using the provided {@link 
OperatorsFinished} and {@link
+     * StreamGraphContext}. The method returns a boolean indicating whether 
the StreamGraph was
+     * successfully optimized.
+     *
+     * @param operatorsFinished the OperatorsFinished object containing 
information about completed
+     *     operators and their produced data size and distribution information.
+     * @param context the StreamGraphContext with a read-only view of a 
StreamGraph, providing
+     *     methods to modify StreamEdges and StreamNodes within the 
StreamGraph.
+     * @return {@code true} if the StreamGraph was successfully optimized; 
{@code false} otherwise.
+     */
+    boolean onOperatorsFinished(OperatorsFinished operatorsFinished, 
StreamGraphContext context)
+            throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java
new file mode 100644
index 00000000000..1ac7583d6f2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code StreamGraphOptimizer} class is responsible for optimizing a 
StreamGraph based on
+ * runtime information.
+ *
+ * <p>Upon initialization, it obtains a {@code StreamGraphContext} from the 
{@code
+ * AdaptiveGraphManager} and loads the specified optimization strategies. At 
runtime, it applies
+ * these strategies sequentially to the StreamGraph using the provided context 
and information about
+ * finished operators.
+ */
+public class StreamGraphOptimizer {
+
+    private final List<StreamGraphOptimizationStrategy> optimizationStrategies;
+
+    public StreamGraphOptimizer(Configuration jobConfiguration, ClassLoader 
userClassLoader)
+            throws DynamicCodeLoadingException {
+        checkNotNull(jobConfiguration);
+
+        Optional<List<String>> optional =
+                jobConfiguration.getOptional(
+                        
StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY);
+        if (optional.isPresent()) {
+            optimizationStrategies = 
loadOptimizationStrategies(optional.get(), userClassLoader);
+        } else {
+            optimizationStrategies = new ArrayList<>();
+        }
+    }
+
+    /**
+     * Applies all loaded optimization strategies to the StreamGraph.
+     *
+     * @param operatorsFinished the object containing information about 
finished operators.
+     * @param context the StreamGraphContext providing methods to modify the 
StreamGraph.
+     */
+    public void onOperatorsFinished(OperatorsFinished operatorsFinished, 
StreamGraphContext context)
+            throws Exception {
+        for (StreamGraphOptimizationStrategy strategy : 
optimizationStrategies) {
+            strategy.onOperatorsFinished(operatorsFinished, context);
+        }
+    }
+
+    private List<StreamGraphOptimizationStrategy> loadOptimizationStrategies(
+            List<String> strategyClassNames, ClassLoader userClassLoader)
+            throws DynamicCodeLoadingException {
+        List<StreamGraphOptimizationStrategy> strategies =
+                new ArrayList<>(strategyClassNames.size());
+
+        for (String strategyClassName : strategyClassNames) {
+            strategies.add(loadOptimizationStrategy(strategyClassName, 
userClassLoader));
+        }
+
+        return strategies;
+    }
+
+    private StreamGraphOptimizationStrategy loadOptimizationStrategy(
+            String strategyClassName, ClassLoader userClassLoader)
+            throws DynamicCodeLoadingException {
+        try {
+            Class<? extends StreamGraphOptimizationStrategy> clazz =
+                    Class.forName(strategyClassName, false, userClassLoader)
+                            .asSubclass(StreamGraphOptimizationStrategy.class);
+
+            return clazz.getDeclaredConstructor().newInstance();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find configured stream graph optimization strategy 
class: "
+                            + strategyClassName,
+                    e);
+        } catch (ClassCastException
+                | InstantiationException
+                | IllegalAccessException
+                | NoSuchMethodException
+                | InvocationTargetException e) {
+            throw new DynamicCodeLoadingException(
+                    "The configured class '"
+                            + strategyClassName
+                            + "' is not a valid stream graph optimization 
strategy",
+                    e);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java
new file mode 100644
index 00000000000..2f063db1642
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphOptimizerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StreamGraphOptimizer}. */
+class StreamGraphOptimizerTest {
+    private Configuration jobConfiguration;
+    private ClassLoader userClassLoader;
+
+    @BeforeEach
+    void setUp() {
+        jobConfiguration = new Configuration();
+        userClassLoader = Thread.currentThread().getContextClassLoader();
+    }
+
+    @Test
+    void testOnOperatorsFinished() throws Exception {
+        List<String> strategyClassNames =
+                List.of(TestingStreamGraphOptimizerStrategy.class.getName());
+        jobConfiguration.set(
+                
StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY,
+                strategyClassNames);
+
+        StreamGraphOptimizer optimizer =
+                new StreamGraphOptimizer(jobConfiguration, userClassLoader);
+
+        OperatorsFinished operatorsFinished =
+                new OperatorsFinished(new ArrayList<>(), new HashMap<>());
+        StreamGraphContext context =
+                new StreamGraphContext() {
+                    @Override
+                    public ImmutableStreamGraph getStreamGraph() {
+                        return null;
+                    }
+
+                    @Override
+                    public @Nullable StreamOperatorFactory<?> 
getOperatorFactory(
+                            Integer streamNodeId) {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean modifyStreamEdge(
+                            List<StreamEdgeUpdateRequestInfo> requestInfos) {
+                        return false;
+                    }
+                };
+
+        optimizer.onOperatorsFinished(operatorsFinished, context);
+
+        
assertThat(TestingStreamGraphOptimizerStrategy.collectedOperatorsFinished)
+                .isEqualTo(operatorsFinished);
+        
assertThat(TestingStreamGraphOptimizerStrategy.collectedStreamGraphContext)
+                .isEqualTo(context);
+    }
+
+    protected static final class TestingStreamGraphOptimizerStrategy
+            implements StreamGraphOptimizationStrategy {
+
+        private static OperatorsFinished collectedOperatorsFinished;
+        private static StreamGraphContext collectedStreamGraphContext;
+
+        @Override
+        public boolean onOperatorsFinished(
+                OperatorsFinished operatorsFinished, StreamGraphContext 
context) {
+            collectedOperatorsFinished = operatorsFinished;
+            collectedStreamGraphContext = context;
+            return true;
+        }
+    }
+}

Reply via email to