This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd763672b85 [FLINK-28547][runtime] Add IT cases for
SpeculativeScheduler.
fd763672b85 is described below
commit fd763672b858e74b24760e5c98ff9af22caa8a14
Author: Lijie Wang <[email protected]>
AuthorDate: Thu Jul 14 15:46:38 2022 +0800
[FLINK-28547][runtime] Add IT cases for SpeculativeScheduler.
This closes #20271.
---
.../adaptivebatch/SpeculativeScheduler.java | 35 ++-
.../scheduling/SpeculativeSchedulerITCase.java | 241 +++++++++++++++++++++
2 files changed, 264 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index b1482caec55..5a4d6298618 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -70,6 +70,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -145,6 +146,9 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
this.blockSlowNodeDuration =
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+ checkArgument(
+ !blockSlowNodeDuration.isNegative(),
+ "The blocking duration should not be negative.");
this.blocklistOperations = checkNotNull(blocklistOperations);
@@ -268,19 +272,9 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
@Override
public void notifySlowTasks(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+
// add slow nodes to blocklist before scheduling new speculative
executions
- final long blockedEndTimestamp =
- System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
- final Collection<BlockedNode> nodesToBlock =
- getSlowNodeIds(slowTasks).stream()
- .map(
- nodeId ->
- new BlockedNode(
- nodeId,
- "Node is detected to be slow.",
- blockedEndTimestamp))
- .collect(Collectors.toList());
- blocklistOperations.addNewBlockedNodes(nodesToBlock);
+ blockSlowNodes(slowTasks);
final List<Execution> newSpeculativeExecutions = new ArrayList<>();
final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
@@ -314,6 +308,23 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
}
+ private void blockSlowNodes(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+ if (!blockSlowNodeDuration.isZero()) {
+ final long blockedEndTimestamp =
+ System.currentTimeMillis() +
blockSlowNodeDuration.toMillis();
+ final Collection<BlockedNode> nodesToBlock =
+ getSlowNodeIds(slowTasks).stream()
+ .map(
+ nodeId ->
+ new BlockedNode(
+ nodeId,
+ "Node is detected to be
slow.",
+ blockedEndTimestamp))
+ .collect(Collectors.toList());
+ blocklistOperations.addNewBlockedNodes(nodesToBlock);
+ }
+ }
+
private Set<String> getSlowNodeIds(
Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
final Set<ExecutionAttemptID> slowExecutions =
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
new file mode 100644
index 00000000000..41870d8498e
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link SpeculativeScheduler}. */
+class SpeculativeSchedulerITCase {
+
+ @TempDir private Path temporaryFolder;
+ private static final int MAX_PARALLELISM = 4;
+ private static final int NUMBERS_TO_PRODUCE = 10000;
+ private static final int FAILURE_COUNT = 20;
+
+ private int parallelism;
+
+ // the key is the subtask index so that different attempts will not add
duplicated results
+ private static ConcurrentMap<Integer, Map<Long, Long>> numberCountResults;
+
+ private Map<Long, Long> expectedResult;
+
+ @BeforeEach
+ void setUp() {
+ parallelism = 4;
+
+ expectedResult =
+ LongStream.range(0, NUMBERS_TO_PRODUCE)
+ .boxed()
+ .collect(Collectors.toMap(Function.identity(), i ->
1L));
+
+ NumberCounterMap.toFailCounter.set(0);
+
+ numberCountResults = new ConcurrentHashMap<>();
+ }
+
+ @Test
+ void testSpeculativeExecution() throws Exception {
+ executeJob();
+ waitUntilJobArchived();
+ checkResults();
+ }
+
+ @Test
+ void testSpeculativeExecutionWithFailover() throws Exception {
+ NumberCounterMap.toFailCounter.set(FAILURE_COUNT);
+ executeJob();
+ waitUntilJobArchived();
+ checkResults();
+ }
+
+ @Test
+ void testSpeculativeExecutionWithAdaptiveParallelism() throws Exception {
+ parallelism = -1;
+ executeJob();
+ waitUntilJobArchived();
+ checkResults();
+ }
+
+ @Test
+ void testBlockSlowNodeInSpeculativeExecution() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.BLOCK_SLOW_NODE_DURATION,
Duration.ofMinutes(1));
+ JobClient client = executeJobAsync(configuration);
+
+ assertThatThrownBy(
+ () -> client.getJobExecutionResult().get(30,
TimeUnit.SECONDS),
+ "The local node is expected to be blocked but it is
not.")
+ .isInstanceOf(TimeoutException.class);
+ }
+
+ private void checkResults() {
+ final Map<Long, Long> numberCountResultMap =
+ numberCountResults.values().stream()
+ .flatMap(map -> map.entrySet().stream())
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
Map.Entry::getValue, Long::sum));
+
+ assertThat(numberCountResultMap).isEqualTo(expectedResult);
+ }
+
+ private void executeJob() throws Exception {
+ JobClient client = executeJobAsync(new Configuration());
+ client.getJobExecutionResult().get();
+ }
+
+ private JobClient executeJobAsync(Configuration configuration) throws
Exception {
+ configuration.setString(RestOptions.BIND_PORT, "0");
+ configuration.set(JobManagerOptions.ARCHIVE_DIR,
temporaryFolder.getRoot().toString());
+ configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+ configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
+ configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE,
MemorySize.parse("4kb"));
+ configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, MAX_PARALLELISM);
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY,
"fixed-delay");
+ configuration.set(
+ RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
Integer.MAX_VALUE);
+
+ // for speculative execution
+ configuration.set(
+ JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.AdaptiveBatch);
+ configuration.set(JobManagerOptions.SPECULATIVE_ENABLED, true);
+ // for testing, does not block node by default
+ if
(!configuration.contains(JobManagerOptions.BLOCK_SLOW_NODE_DURATION)) {
+ configuration.set(JobManagerOptions.BLOCK_SLOW_NODE_DURATION,
Duration.ZERO);
+ }
+
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER,
1.0);
+
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, 0.2);
+ configuration.set(
+ SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND,
Duration.ofMillis(0));
+
+ // for adaptive parallelism
+ configuration.set(
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM,
+ MAX_PARALLELISM);
+
configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM,
1);
+ configuration.set(
+ JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM,
MAX_PARALLELISM);
+ configuration.set(
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK,
+ MemorySize.parse("150kb"));
+
+ final StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.createLocalEnvironment(configuration);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(-1);
+
+ final DataStream<Long> source =
+ env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+ .setParallelism(parallelism)
+ .name("source")
+ .slotSharingGroup("group1");
+
+ final DataStream<Long> map =
+ source.rebalance()
+ .map(new NumberCounterMap())
+ .setParallelism(parallelism)
+ .name("map")
+ .slotSharingGroup("group2");
+
+ map.rebalance()
+ .addSink(new NumberCounterSink())
+ .setParallelism(parallelism)
+ .name("sink")
+ .slotSharingGroup("group3");
+
+ return env.executeAsync();
+ }
+
+ private void waitUntilJobArchived() throws InterruptedException {
+ while (temporaryFolder.getRoot().toFile().listFiles().length < 1) {
+ Thread.sleep(1000);
+ }
+ }
+
+ private static class NumberCounterMap extends RichMapFunction<Long, Long> {
+ private static final AtomicInteger toFailCounter = new
AtomicInteger(0);
+
+ @Override
+ public Long map(Long value) throws Exception {
+ if (toFailCounter.decrementAndGet() >= 0) {
+ throw new Exception("Forced failure for testing");
+ }
+
+ if (isSlowTask()) {
+ Thread.sleep(1_000_000);
+ }
+ return value;
+ }
+
+ private boolean isSlowTask() {
+ return (getRuntimeContext().getAttemptNumber()
+ +
getRuntimeContext().getIndexOfThisSubtask())
+ % 2
+ == 1;
+ }
+ }
+
+ private static class NumberCounterSink extends RichSinkFunction<Long> {
+
+ private final Map<Long, Long> numberCountResult = new HashMap<>();
+
+ @Override
+ public void invoke(Long value, Context context) throws Exception {
+ numberCountResult.merge(value, 1L, Long::sum);
+ }
+
+ @Override
+ public void finish() {
+
numberCountResults.put(getRuntimeContext().getIndexOfThisSubtask(),
numberCountResult);
+ }
+ }
+}