This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d529a5a167 [FLINK-31832][runtime] Add benchmarks for end to end 
restarting tasks
7d529a5a167 is described below

commit 7d529a5a167f52c1b76fd6f70918b27149cf8782
Author: Weihua Hu <[email protected]>
AuthorDate: Tue Apr 18 13:51:14 2023 +0800

    [FLINK-31832][runtime] Add benchmarks for end to end restarting tasks
    
    This closes #22413
---
 .../scheduler/benchmark/JobConfiguration.java      |  53 ++++++++-
 .../benchmark/SchedulerBenchmarkBase.java          |   9 +-
 ...leGlobalFailureAndRestartAllTasksBenchmark.java | 118 +++++++++++++++++++++
 ...obalFailureAndRestartAllTasksBenchmarkTest.java |  44 ++++++++
 .../e2e/SchedulerEndToEndBenchmarkBase.java        |  13 ++-
 5 files changed, 225 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
index 725ddc1818a..57033a3bcda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
@@ -33,46 +33,85 @@ public enum JobConfiguration {
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
             ExecutionMode.PIPELINED,
-            4000),
+            4000,
+            false),
 
     BATCH(
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
             ExecutionMode.BATCH,
-            4000),
+            4000,
+            false),
 
     STREAMING_TEST(
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
             ExecutionMode.PIPELINED,
-            10),
+            10,
+            false),
 
     BATCH_TEST(
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
             ExecutionMode.BATCH,
-            10);
+            10,
+            false),
+
+    STREAMING_EVENLY(
+            DistributionPattern.ALL_TO_ALL,
+            ResultPartitionType.PIPELINED,
+            JobType.STREAMING,
+            ExecutionMode.PIPELINED,
+            4000,
+            true),
+
+    BATCH_EVENLY(
+            DistributionPattern.ALL_TO_ALL,
+            ResultPartitionType.BLOCKING,
+            JobType.BATCH,
+            ExecutionMode.BATCH,
+            4000,
+            true),
+
+    STREAMING_EVENLY_TEST(
+            DistributionPattern.ALL_TO_ALL,
+            ResultPartitionType.PIPELINED,
+            JobType.STREAMING,
+            ExecutionMode.PIPELINED,
+            10,
+            true),
+
+    BATCH_EVENLY_TEST(
+            DistributionPattern.ALL_TO_ALL,
+            ResultPartitionType.BLOCKING,
+            JobType.BATCH,
+            ExecutionMode.BATCH,
+            10,
+            true);
 
     private final int parallelism;
     private final DistributionPattern distributionPattern;
     private final ResultPartitionType resultPartitionType;
     private final JobType jobType;
     private final ExecutionMode executionMode;
+    private final boolean evenlySpreadOutSlots;
 
     JobConfiguration(
             DistributionPattern distributionPattern,
             ResultPartitionType resultPartitionType,
             JobType jobType,
             ExecutionMode executionMode,
-            int parallelism) {
+            int parallelism,
+            boolean evenlySpreadOutSlots) {
         this.distributionPattern = distributionPattern;
         this.resultPartitionType = resultPartitionType;
         this.jobType = jobType;
         this.executionMode = executionMode;
         this.parallelism = parallelism;
+        this.evenlySpreadOutSlots = evenlySpreadOutSlots;
     }
 
     public int getParallelism() {
@@ -94,4 +133,8 @@ public enum JobConfiguration {
     public ExecutionMode getExecutionMode() {
         return executionMode;
     }
+
+    public boolean isEvenlySpreadOutSlots() {
+        return evenlySpreadOutSlots;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
index 10250118d0d..0cb7b531264 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
@@ -28,9 +28,12 @@ public class SchedulerBenchmarkBase {
     public ScheduledExecutorService scheduledExecutorService;
 
     public void setup() {
-        scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory("flink-benchmarks"));
+        // This may have been set in subclass for special purposes.
+        if (scheduledExecutorService == null) {
+            scheduledExecutorService =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory("flink-benchmarks"));
+        }
     }
 
     public void teardown() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
new file mode 100644
index 00000000000..0e0cb4080e4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.benchmark.e2e;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import java.util.Collections;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+
+/**
+ * The benchmark of handling global failure and restarting tasks in a 
STREAMING/BATCH job. The
+ * related method is {@link DefaultScheduler#handleGlobalFailure}.
+ */
+public class HandleGlobalFailureAndRestartAllTasksBenchmark extends 
SchedulerEndToEndBenchmarkBase {
+    private static final int SLOTS_PER_TASK_EXECUTOR = 4;
+    private DefaultScheduler scheduler;
+    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
+
+    public void setup(JobConfiguration jobConfiguration) throws Exception {
+        taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+        // Use DirectScheduledExecutorService to ensure that we can run
+        // DefaultScheduler#restartTasks in the current thread synchronously 
when tasks restart is
+        // triggered.
+        scheduledExecutorService = new DirectScheduledExecutorService();
+
+        super.setup(jobConfiguration);
+
+        scheduler =
+                createScheduler(
+                        jobGraph,
+                        physicalSlotProvider,
+                        mainThreadExecutor,
+                        scheduledExecutorService,
+                        taskRestartExecutor,
+                        new FixedDelayRestartBackoffTimeStrategy
+                                        
.FixedDelayRestartBackoffTimeStrategyFactory(1, 1)
+                                .create());
+
+        scheduler.startScheduling();
+        offerSlots();
+    }
+
+    public void handleGlobalFailureAndRestartAllTasks() throws Exception {
+        // trigger failover, force reset state to canceled.
+        scheduler.handleGlobalFailure(new RuntimeException("For test."));
+        completeCancellingForAllVertices(scheduler.getExecutionGraph());
+
+        taskRestartExecutor.triggerScheduledTasks();
+    }
+
+    private static DefaultScheduler createScheduler(
+            JobGraph jobGraph,
+            PhysicalSlotProvider physicalSlotProvider,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService executorService,
+            ScheduledExecutor taskRestartExecutor,
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy)
+            throws Exception {
+        return new DefaultSchedulerBuilder(
+                        jobGraph,
+                        mainThreadExecutor,
+                        executorService,
+                        executorService,
+                        taskRestartExecutor)
+                .setExecutionSlotAllocatorFactory(
+                        
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+                                physicalSlotProvider))
+                .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+                .build();
+    }
+
+    private void offerSlots() {
+        final int numberSlots =
+                StreamSupport.stream(jobGraph.getVertices().spliterator(), 
false)
+                        .mapToInt(JobVertex::getParallelism)
+                        .sum();
+
+        for (int i = 0; i < Math.ceil((double) numberSlots / 
SLOTS_PER_TASK_EXECUTOR); i++) {
+            SlotPoolUtils.tryOfferSlots(
+                    slotPool,
+                    mainThreadExecutor,
+                    Collections.nCopies(SLOTS_PER_TASK_EXECUTOR, 
ResourceProfile.ANY));
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
new file mode 100644
index 00000000000..0cf54ef7607
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.benchmark.e2e;
+
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * The benchmark of restarting tasks in a STREAMING/BATCH job. The related 
method is {@link
+ * DefaultScheduler#handleGlobalFailure}.
+ */
+class HandleGlobalFailureAndRestartAllTasksBenchmarkTest {
+
+    @ParameterizedTest
+    @EnumSource(
+            value = JobConfiguration.class,
+            names = {"STREAMING_TEST", "BATCH_TEST", "STREAMING_EVENLY_TEST", 
"BATCH_EVENLY_TEST"})
+    void deployAndRestarts(JobConfiguration jobConfiguration) throws Exception 
{
+        HandleGlobalFailureAndRestartAllTasksBenchmark benchmark =
+                new HandleGlobalFailureAndRestartAllTasksBenchmark();
+        benchmark.setup(jobConfiguration);
+        benchmark.handleGlobalFailureAndRestartAllTasks();
+        benchmark.teardown();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
index 318375d8bc0..28a28d5c792 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelecti
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -62,13 +63,17 @@ public class SchedulerEndToEndBenchmarkBase extends 
SchedulerBenchmarkBase {
         jobGraph = createJobGraph(jobVertices, jobConfiguration);
 
         slotPool = new 
DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor);
-        physicalSlotProvider = createPhysicalSlotProvider(slotPool);
+        SlotSelectionStrategy slotSelectionStrategy =
+                jobConfiguration.isEvenlySpreadOutSlots()
+                        ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
+                        : 
LocationPreferenceSlotSelectionStrategy.createDefault();
+        physicalSlotProvider = 
createPhysicalSlotProvider(slotSelectionStrategy, slotPool);
     }
 
-    private static PhysicalSlotProvider createPhysicalSlotProvider(SlotPool 
slotPool) {
+    private static PhysicalSlotProvider createPhysicalSlotProvider(
+            SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
 
-        return new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createDefault(), 
slotPool);
+        return new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
     }
 
     static DefaultScheduler createScheduler(

Reply via email to