This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7d529a5a167 [FLINK-31832][runtime] Add benchmarks for end to end
restarting tasks
7d529a5a167 is described below
commit 7d529a5a167f52c1b76fd6f70918b27149cf8782
Author: Weihua Hu <[email protected]>
AuthorDate: Tue Apr 18 13:51:14 2023 +0800
[FLINK-31832][runtime] Add benchmarks for end to end restarting tasks
This closes #22413
---
.../scheduler/benchmark/JobConfiguration.java | 53 ++++++++-
.../benchmark/SchedulerBenchmarkBase.java | 9 +-
...leGlobalFailureAndRestartAllTasksBenchmark.java | 118 +++++++++++++++++++++
...obalFailureAndRestartAllTasksBenchmarkTest.java | 44 ++++++++
.../e2e/SchedulerEndToEndBenchmarkBase.java | 13 ++-
5 files changed, 225 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
index 725ddc1818a..57033a3bcda 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
@@ -33,46 +33,85 @@ public enum JobConfiguration {
ResultPartitionType.PIPELINED,
JobType.STREAMING,
ExecutionMode.PIPELINED,
- 4000),
+ 4000,
+ false),
BATCH(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING,
JobType.BATCH,
ExecutionMode.BATCH,
- 4000),
+ 4000,
+ false),
STREAMING_TEST(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED,
JobType.STREAMING,
ExecutionMode.PIPELINED,
- 10),
+ 10,
+ false),
BATCH_TEST(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING,
JobType.BATCH,
ExecutionMode.BATCH,
- 10);
+ 10,
+ false),
+
+ STREAMING_EVENLY(
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED,
+ JobType.STREAMING,
+ ExecutionMode.PIPELINED,
+ 4000,
+ true),
+
+ BATCH_EVENLY(
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.BLOCKING,
+ JobType.BATCH,
+ ExecutionMode.BATCH,
+ 4000,
+ true),
+
+ STREAMING_EVENLY_TEST(
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED,
+ JobType.STREAMING,
+ ExecutionMode.PIPELINED,
+ 10,
+ true),
+
+ BATCH_EVENLY_TEST(
+ DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.BLOCKING,
+ JobType.BATCH,
+ ExecutionMode.BATCH,
+ 10,
+ true);
private final int parallelism;
private final DistributionPattern distributionPattern;
private final ResultPartitionType resultPartitionType;
private final JobType jobType;
private final ExecutionMode executionMode;
+ private final boolean evenlySpreadOutSlots;
JobConfiguration(
DistributionPattern distributionPattern,
ResultPartitionType resultPartitionType,
JobType jobType,
ExecutionMode executionMode,
- int parallelism) {
+ int parallelism,
+ boolean evenlySpreadOutSlots) {
this.distributionPattern = distributionPattern;
this.resultPartitionType = resultPartitionType;
this.jobType = jobType;
this.executionMode = executionMode;
this.parallelism = parallelism;
+ this.evenlySpreadOutSlots = evenlySpreadOutSlots;
}
public int getParallelism() {
@@ -94,4 +133,8 @@ public enum JobConfiguration {
public ExecutionMode getExecutionMode() {
return executionMode;
}
+
+ public boolean isEvenlySpreadOutSlots() {
+ return evenlySpreadOutSlots;
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
index 10250118d0d..0cb7b531264 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkBase.java
@@ -28,9 +28,12 @@ public class SchedulerBenchmarkBase {
public ScheduledExecutorService scheduledExecutorService;
public void setup() {
- scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory("flink-benchmarks"));
+ // This may have been set in subclass for special purposes.
+ if (scheduledExecutorService == null) {
+ scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("flink-benchmarks"));
+ }
}
public void teardown() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
new file mode 100644
index 00000000000..0e0cb4080e4
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmark.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.benchmark.e2e;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import java.util.Collections;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.StreamSupport;
+
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+
+/**
+ * The benchmark of handling global failure and restarting tasks in a
STREAMING/BATCH job. The
+ * related method is {@link DefaultScheduler#handleGlobalFailure}.
+ */
+public class HandleGlobalFailureAndRestartAllTasksBenchmark extends
SchedulerEndToEndBenchmarkBase {
+ private static final int SLOTS_PER_TASK_EXECUTOR = 4;
+ private DefaultScheduler scheduler;
+ private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
+
+ public void setup(JobConfiguration jobConfiguration) throws Exception {
+ taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+ // Use DirectScheduledExecutorService to ensure that we can run
+ // DefaultScheduler#restartTasks in the current thread synchronously
when tasks restart is
+ // triggered.
+ scheduledExecutorService = new DirectScheduledExecutorService();
+
+ super.setup(jobConfiguration);
+
+ scheduler =
+ createScheduler(
+ jobGraph,
+ physicalSlotProvider,
+ mainThreadExecutor,
+ scheduledExecutorService,
+ taskRestartExecutor,
+ new FixedDelayRestartBackoffTimeStrategy
+
.FixedDelayRestartBackoffTimeStrategyFactory(1, 1)
+ .create());
+
+ scheduler.startScheduling();
+ offerSlots();
+ }
+
+ public void handleGlobalFailureAndRestartAllTasks() throws Exception {
+ // trigger failover, force reset state to canceled.
+ scheduler.handleGlobalFailure(new RuntimeException("For test."));
+ completeCancellingForAllVertices(scheduler.getExecutionGraph());
+
+ taskRestartExecutor.triggerScheduledTasks();
+ }
+
+ private static DefaultScheduler createScheduler(
+ JobGraph jobGraph,
+ PhysicalSlotProvider physicalSlotProvider,
+ ComponentMainThreadExecutor mainThreadExecutor,
+ ScheduledExecutorService executorService,
+ ScheduledExecutor taskRestartExecutor,
+ RestartBackoffTimeStrategy restartBackoffTimeStrategy)
+ throws Exception {
+ return new DefaultSchedulerBuilder(
+ jobGraph,
+ mainThreadExecutor,
+ executorService,
+ executorService,
+ taskRestartExecutor)
+ .setExecutionSlotAllocatorFactory(
+
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+ physicalSlotProvider))
+ .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+ .build();
+ }
+
+ private void offerSlots() {
+ final int numberSlots =
+ StreamSupport.stream(jobGraph.getVertices().spliterator(),
false)
+ .mapToInt(JobVertex::getParallelism)
+ .sum();
+
+ for (int i = 0; i < Math.ceil((double) numberSlots /
SLOTS_PER_TASK_EXECUTOR); i++) {
+ SlotPoolUtils.tryOfferSlots(
+ slotPool,
+ mainThreadExecutor,
+ Collections.nCopies(SLOTS_PER_TASK_EXECUTOR,
ResourceProfile.ANY));
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
new file mode 100644
index 00000000000..0cf54ef7607
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.benchmark.e2e;
+
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * The benchmark of restarting tasks in a STREAMING/BATCH job. The related
method is {@link
+ * DefaultScheduler#handleGlobalFailure}.
+ */
+class HandleGlobalFailureAndRestartAllTasksBenchmarkTest {
+
+ @ParameterizedTest
+ @EnumSource(
+ value = JobConfiguration.class,
+ names = {"STREAMING_TEST", "BATCH_TEST", "STREAMING_EVENLY_TEST",
"BATCH_EVENLY_TEST"})
+ void deployAndRestarts(JobConfiguration jobConfiguration) throws Exception
{
+ HandleGlobalFailureAndRestartAllTasksBenchmark benchmark =
+ new HandleGlobalFailureAndRestartAllTasksBenchmark();
+ benchmark.setup(jobConfiguration);
+ benchmark.handleGlobalFailureAndRestartAllTasks();
+ benchmark.teardown();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
index 318375d8bc0..28a28d5c792 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelecti
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -62,13 +63,17 @@ public class SchedulerEndToEndBenchmarkBase extends
SchedulerBenchmarkBase {
jobGraph = createJobGraph(jobVertices, jobConfiguration);
slotPool = new
DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor);
- physicalSlotProvider = createPhysicalSlotProvider(slotPool);
+ SlotSelectionStrategy slotSelectionStrategy =
+ jobConfiguration.isEvenlySpreadOutSlots()
+ ?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
+ :
LocationPreferenceSlotSelectionStrategy.createDefault();
+ physicalSlotProvider =
createPhysicalSlotProvider(slotSelectionStrategy, slotPool);
}
- private static PhysicalSlotProvider createPhysicalSlotProvider(SlotPool
slotPool) {
+ private static PhysicalSlotProvider createPhysicalSlotProvider(
+ SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
- return new PhysicalSlotProviderImpl(
- LocationPreferenceSlotSelectionStrategy.createDefault(),
slotPool);
+ return new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
}
static DefaultScheduler createScheduler(