tillrohrmann commented on a change in pull request #13880:
URL: https://github.com/apache/flink/pull/13880#discussion_r519198114
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph
graph) {
} else {
graph.setStateBackend(stateBackend);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
Review comment:
Why is this change necessary?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * To test approximate downstream failover.
+ *
+ * <p>If a task fails, all its downstream tasks restart, including itself.
+ */
+
+public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
+ private static final int BUFFER_SIZE = 4096;
+
+ @Rule
+ public MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfig())
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(300000L);
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/1) -----> (sink1/1)
+ * </pre>
+ * (map1/1) fails, (map1/1) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryThreeTasks() throws Exception {
+ final int failAfterElements = 150;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .setMaxParallelism(128)
+ .disableOperatorChaining()
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction())
+ .slotSharingGroup("source")
+ .map(new FailingMapper<>(failAfterElements))
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(300))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testThreeTasks");
+ }
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/2) -----> (sink1/1)
+ * | ^
+ * -------------> (map2/2) ---------|
+ * </pre>
+ * (map1/2) fails, (map1/2) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
+ final int failAfterElements = 20;
+ final int keyByChannelNumber = 2;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .disableOperatorChaining()
+ .setMaxParallelism(128)
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction(BUFFER_SIZE,
env.getMaxParallelism(), keyByChannelNumber))
+ .slotSharingGroup("source")
+ .keyBy(1)
+ .map(new FailingMapper<>(failAfterElements))
+ .setParallelism(keyByChannelNumber)
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(200,
keyByChannelNumber))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testTwoMapTasks");
+ }
+
+ // Schema: (index, key, assignedChannel, long string).
+ private static class AppSourceFunction extends
RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private final String shortString = "I am a very long string to
test partial records hohoho hahaha ";
+ private final String longOrShortString;
+ private final int maxParallelism;
+ private final int numberOfChannels;
+ private final int[] keys;
+ private int index = 0;
+ private volatile boolean running = true;
+
+ // short-length string
+ AppSourceFunction() {
+ this.longOrShortString = shortString;
+ this.maxParallelism = 128;
+ this.numberOfChannels = 1;
+ this.keys = initKeys(numberOfChannels);
+ }
+
+ // long-length string
+ AppSourceFunction(int bufferSize, int maxParallelism, int
numberOfChannels) {
+ this.maxParallelism = maxParallelism;
+ this.numberOfChannels = numberOfChannels;
+ this.keys = initKeys(numberOfChannels);
+
+ StringBuilder builder = new StringBuilder(shortString);
+ for (int i = 0; i <= 2 * bufferSize /
shortString.length() + 1; i++) {
+ builder.append(shortString);
+ }
+ this.longOrShortString = builder.toString();
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, Integer, Integer,
String>> ctx) throws Exception{
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ if (index % 100 == 0) {
+ Thread.sleep(50);
+ }
+ int key = keys[index %
numberOfChannels];
+ ctx.collect(new Tuple4<>(index, key,
assignedIndex(key), longOrShortString));
+ }
+ index++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ private int[] initKeys(int numberOfChannels) {
+ int[] keys = new int[numberOfChannels];
+
+ for (int i = 0; i < numberOfChannels; i++) {
+ int key = 0;
+ while (key < 1000 && assignedIndex(key) != i) {
+ key++;
+ }
+ assert key < 1000 : "Can not find a key within
number 1000";
+ keys[i] = key;
+ }
+
+ return keys;
+ }
+
+ private int assignedIndex(int key) {
+ return
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism,
numberOfChannels);
+ }
+ }
+
+ private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 6334389850158703L;
+
+ public static volatile boolean failedBefore;
+
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+
+ public FailingMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() ==
0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ failedBefore = true;
+ throw new Exception("Artificial Test
Failure");
+ }
+ }
+
+ return value;
+ }
+ }
+
+ private static class ValidatingAtMostOnceSink extends
RichSinkFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private static final long serialVersionUID =
1748426382527469932L;
+ private final int numElementsTotal;
+ private final int[] numElements;
+ private final Integer[] indexReachingNumElements;
+ private final int numberOfInputChannels;
+
+ public ValidatingAtMostOnceSink(int numElementsTotal, int
numberOfInputChannels) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = numberOfInputChannels;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ public ValidatingAtMostOnceSink(int numElementsTotal) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = 1;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ @Override
+ public void invoke(Tuple4<Integer, Integer, Integer, String>
value) throws Exception {
+ assert value.f2 < numberOfInputChannels;
+ numElements[value.f2]++;
+
+ boolean allReachNumElementsTotal = true;
+ for (int i = 0; i < numberOfInputChannels; i++) {
+ if (numElements[i] == numElementsTotal) {
+ indexReachingNumElements[i] = value.f0;
+ } else if (numElements[i] < numElementsTotal) {
+ allReachNumElementsTotal = false;
+ }
+ }
+ if (allReachNumElementsTotal) {
+ assert
Collections.max(Arrays.asList(indexReachingNumElements)).intValue() >=
numElementsTotal * numberOfInputChannels;
Review comment:
Please add a comment explaining why this is the case. E.g. Source is
generating monotonic increasing and continuous indices as long as it is not
restarted.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * To test approximate downstream failover.
+ *
+ * <p>If a task fails, all its downstream tasks restart, including itself.
+ */
+
+public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
+ private static final int BUFFER_SIZE = 4096;
+
+ @Rule
+ public MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfig())
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(300000L);
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/1) -----> (sink1/1)
+ * </pre>
+ * (map1/1) fails, (map1/1) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryThreeTasks() throws Exception {
+ final int failAfterElements = 150;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .setMaxParallelism(128)
+ .disableOperatorChaining()
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction())
+ .slotSharingGroup("source")
+ .map(new FailingMapper<>(failAfterElements))
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(300))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testThreeTasks");
+ }
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/2) -----> (sink1/1)
+ * | ^
+ * -------------> (map2/2) ---------|
+ * </pre>
+ * (map1/2) fails, (map1/2) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
+ final int failAfterElements = 20;
+ final int keyByChannelNumber = 2;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .disableOperatorChaining()
+ .setMaxParallelism(128)
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction(BUFFER_SIZE,
env.getMaxParallelism(), keyByChannelNumber))
+ .slotSharingGroup("source")
+ .keyBy(1)
+ .map(new FailingMapper<>(failAfterElements))
+ .setParallelism(keyByChannelNumber)
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(200,
keyByChannelNumber))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testTwoMapTasks");
+ }
+
+ // Schema: (index, key, assignedChannel, long string).
+ private static class AppSourceFunction extends
RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private final String shortString = "I am a very long string to
test partial records hohoho hahaha ";
+ private final String longOrShortString;
+ private final int maxParallelism;
+ private final int numberOfChannels;
+ private final int[] keys;
+ private int index = 0;
+ private volatile boolean running = true;
+
+ // short-length string
+ AppSourceFunction() {
+ this.longOrShortString = shortString;
+ this.maxParallelism = 128;
+ this.numberOfChannels = 1;
+ this.keys = initKeys(numberOfChannels);
+ }
+
+ // long-length string
+ AppSourceFunction(int bufferSize, int maxParallelism, int
numberOfChannels) {
+ this.maxParallelism = maxParallelism;
+ this.numberOfChannels = numberOfChannels;
+ this.keys = initKeys(numberOfChannels);
+
+ StringBuilder builder = new StringBuilder(shortString);
+ for (int i = 0; i <= 2 * bufferSize /
shortString.length() + 1; i++) {
+ builder.append(shortString);
+ }
+ this.longOrShortString = builder.toString();
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, Integer, Integer,
String>> ctx) throws Exception{
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ if (index % 100 == 0) {
+ Thread.sleep(50);
+ }
+ int key = keys[index %
numberOfChannels];
+ ctx.collect(new Tuple4<>(index, key,
assignedIndex(key), longOrShortString));
+ }
+ index++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ private int[] initKeys(int numberOfChannels) {
+ int[] keys = new int[numberOfChannels];
+
+ for (int i = 0; i < numberOfChannels; i++) {
+ int key = 0;
+ while (key < 1000 && assignedIndex(key) != i) {
+ key++;
+ }
+ assert key < 1000 : "Can not find a key within
number 1000";
+ keys[i] = key;
+ }
+
+ return keys;
+ }
Review comment:
Maybe state somewhere that this method tries to find the first key `k`
which falls into the key group range of channel `i`.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * To test approximate downstream failover.
+ *
+ * <p>If a task fails, all its downstream tasks restart, including itself.
+ */
+
+public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
+ private static final int BUFFER_SIZE = 4096;
+
+ @Rule
+ public MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfig())
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(300000L);
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/1) -----> (sink1/1)
+ * </pre>
+ * (map1/1) fails, (map1/1) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryThreeTasks() throws Exception {
+ final int failAfterElements = 150;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .setMaxParallelism(128)
+ .disableOperatorChaining()
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction())
+ .slotSharingGroup("source")
+ .map(new FailingMapper<>(failAfterElements))
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(300))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testThreeTasks");
+ }
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/2) -----> (sink1/1)
+ * | ^
+ * -------------> (map2/2) ---------|
+ * </pre>
+ * (map1/2) fails, (map1/2) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
+ final int failAfterElements = 20;
+ final int keyByChannelNumber = 2;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .disableOperatorChaining()
+ .setMaxParallelism(128)
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction(BUFFER_SIZE,
env.getMaxParallelism(), keyByChannelNumber))
+ .slotSharingGroup("source")
+ .keyBy(1)
+ .map(new FailingMapper<>(failAfterElements))
+ .setParallelism(keyByChannelNumber)
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(200,
keyByChannelNumber))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testTwoMapTasks");
+ }
+
+ // Schema: (index, key, assignedChannel, long string).
+ private static class AppSourceFunction extends
RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private final String shortString = "I am a very long string to
test partial records hohoho hahaha ";
+ private final String longOrShortString;
+ private final int maxParallelism;
+ private final int numberOfChannels;
+ private final int[] keys;
+ private int index = 0;
+ private volatile boolean running = true;
+
+ // short-length string
+ AppSourceFunction() {
+ this.longOrShortString = shortString;
+ this.maxParallelism = 128;
+ this.numberOfChannels = 1;
+ this.keys = initKeys(numberOfChannels);
+ }
+
+ // long-length string
+ AppSourceFunction(int bufferSize, int maxParallelism, int
numberOfChannels) {
+ this.maxParallelism = maxParallelism;
+ this.numberOfChannels = numberOfChannels;
+ this.keys = initKeys(numberOfChannels);
+
+ StringBuilder builder = new StringBuilder(shortString);
+ for (int i = 0; i <= 2 * bufferSize /
shortString.length() + 1; i++) {
+ builder.append(shortString);
+ }
+ this.longOrShortString = builder.toString();
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, Integer, Integer,
String>> ctx) throws Exception{
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ if (index % 100 == 0) {
+ Thread.sleep(50);
+ }
+ int key = keys[index %
numberOfChannels];
+ ctx.collect(new Tuple4<>(index, key,
assignedIndex(key), longOrShortString));
+ }
+ index++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ private int[] initKeys(int numberOfChannels) {
+ int[] keys = new int[numberOfChannels];
+
+ for (int i = 0; i < numberOfChannels; i++) {
+ int key = 0;
+ while (key < 1000 && assignedIndex(key) != i) {
+ key++;
+ }
+ assert key < 1000 : "Can not find a key within
number 1000";
+ keys[i] = key;
+ }
+
+ return keys;
+ }
+
+ private int assignedIndex(int key) {
+ return
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism,
numberOfChannels);
+ }
+ }
+
+ private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 6334389850158703L;
+
+ public static volatile boolean failedBefore;
+
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+
+ public FailingMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() ==
0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ failedBefore = true;
+ throw new Exception("Artificial Test
Failure");
+ }
+ }
+
+ return value;
+ }
+ }
+
+ private static class ValidatingAtMostOnceSink extends
RichSinkFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private static final long serialVersionUID =
1748426382527469932L;
+ private final int numElementsTotal;
+ private final int[] numElements;
+ private final Integer[] indexReachingNumElements;
+ private final int numberOfInputChannels;
+
+ public ValidatingAtMostOnceSink(int numElementsTotal, int
numberOfInputChannels) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = numberOfInputChannels;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ public ValidatingAtMostOnceSink(int numElementsTotal) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = 1;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ @Override
+ public void invoke(Tuple4<Integer, Integer, Integer, String>
value) throws Exception {
Review comment:
I think the test would be a bit easier to maintain if we don't use a
`Tuple4` but a Pojo with proper names for the fields.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * To test approximate downstream failover.
+ *
+ * <p>If a task fails, all its downstream tasks restart, including itself.
+ */
+
+public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
+ private static final int BUFFER_SIZE = 4096;
+
+ @Rule
+ public MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfig())
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(300000L);
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/1) -----> (sink1/1)
+ * </pre>
+ * (map1/1) fails, (map1/1) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryThreeTasks() throws Exception {
+ final int failAfterElements = 150;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .setMaxParallelism(128)
+ .disableOperatorChaining()
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction())
+ .slotSharingGroup("source")
+ .map(new FailingMapper<>(failAfterElements))
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(300))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testThreeTasks");
+ }
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/2) -----> (sink1/1)
+ * | ^
+ * -------------> (map2/2) ---------|
+ * </pre>
+ * (map1/2) fails, (map1/2) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
+ final int failAfterElements = 20;
+ final int keyByChannelNumber = 2;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .disableOperatorChaining()
+ .setMaxParallelism(128)
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction(BUFFER_SIZE,
env.getMaxParallelism(), keyByChannelNumber))
+ .slotSharingGroup("source")
+ .keyBy(1)
+ .map(new FailingMapper<>(failAfterElements))
+ .setParallelism(keyByChannelNumber)
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(200,
keyByChannelNumber))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testTwoMapTasks");
+ }
+
+ // Schema: (index, key, assignedChannel, long string).
+ private static class AppSourceFunction extends
RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private final String shortString = "I am a very long string to
test partial records hohoho hahaha ";
+ private final String longOrShortString;
+ private final int maxParallelism;
+ private final int numberOfChannels;
+ private final int[] keys;
+ private int index = 0;
+ private volatile boolean running = true;
+
+ // short-length string
+ AppSourceFunction() {
+ this.longOrShortString = shortString;
+ this.maxParallelism = 128;
+ this.numberOfChannels = 1;
+ this.keys = initKeys(numberOfChannels);
+ }
+
+ // long-length string
+ AppSourceFunction(int bufferSize, int maxParallelism, int
numberOfChannels) {
+ this.maxParallelism = maxParallelism;
+ this.numberOfChannels = numberOfChannels;
+ this.keys = initKeys(numberOfChannels);
+
+ StringBuilder builder = new StringBuilder(shortString);
+ for (int i = 0; i <= 2 * bufferSize /
shortString.length() + 1; i++) {
+ builder.append(shortString);
+ }
+ this.longOrShortString = builder.toString();
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, Integer, Integer,
String>> ctx) throws Exception{
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ if (index % 100 == 0) {
+ Thread.sleep(50);
+ }
+ int key = keys[index %
numberOfChannels];
+ ctx.collect(new Tuple4<>(index, key,
assignedIndex(key), longOrShortString));
+ }
+ index++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ private int[] initKeys(int numberOfChannels) {
+ int[] keys = new int[numberOfChannels];
+
+ for (int i = 0; i < numberOfChannels; i++) {
+ int key = 0;
+ while (key < 1000 && assignedIndex(key) != i) {
+ key++;
+ }
+ assert key < 1000 : "Can not find a key within
number 1000";
+ keys[i] = key;
+ }
+
+ return keys;
+ }
+
+ private int assignedIndex(int key) {
+ return
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism,
numberOfChannels);
+ }
+ }
+
+ private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 6334389850158703L;
+
+ public static volatile boolean failedBefore;
+
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+
+ public FailingMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() ==
0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ failedBefore = true;
+ throw new Exception("Artificial Test
Failure");
+ }
+ }
+
+ return value;
+ }
+ }
+
+ private static class ValidatingAtMostOnceSink extends
RichSinkFunction<Tuple4<Integer, Integer, Integer, String>> {
Review comment:
A short JavaDoc explaining how the source and the sink play together
could be helpful.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -288,10 +288,24 @@ private void configureStreamGraph(final StreamGraph
graph) {
} else {
graph.setStateBackend(stateBackend);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setScheduleMode(ScheduleMode.EAGER);
+
+ if
(checkpointConfig.isApproximateLocalRecoveryEnabled()) {
+ checkApproximateLocalRecoveryCompatibility();
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
+ } else {
+
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+ }
}
}
+ private void checkApproximateLocalRecoveryCompatibility() {
+ checkState(
+ !checkpointConfig.isUnalignedCheckpointsEnabled(),
+ "Approximate Local Recovery and Unaligned Checkpoint
can not be used together yet");
Review comment:
Can we do a similar check for the scheduler configuration? I think we
should fail hard if we see that we are not using the `legacy` scheduler.
Moreover, an explanatory exception message would be helpful as well.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * To test approximate downstream failover.
+ *
+ * <p>If a task fails, all its downstream tasks restart, including itself.
+ */
+
+public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
+ private static final int BUFFER_SIZE = 4096;
+
+ @Rule
+ public MiniClusterWithClientResource cluster = new
MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createConfig())
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(300000L);
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/1) -----> (sink1/1)
+ * </pre>
+ * (map1/1) fails, (map1/1) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryThreeTasks() throws Exception {
+ final int failAfterElements = 150;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .setMaxParallelism(128)
+ .disableOperatorChaining()
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction())
+ .slotSharingGroup("source")
+ .map(new FailingMapper<>(failAfterElements))
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(300))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testThreeTasks");
+ }
+
+ /**
+ * Test the following topology.
+ * <pre>
+ * (source1/1) -----> (map1/2) -----> (sink1/1)
+ * | ^
+ * -------------> (map2/2) ---------|
+ * </pre>
+ * (map1/2) fails, (map1/2) and (sink1/1) restart
+ */
+ @Test
+ public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
+ final int failAfterElements = 20;
+ final int keyByChannelNumber = 2;
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env
+ .setParallelism(1)
+ .setBufferTimeout(0)
+ .disableOperatorChaining()
+ .setMaxParallelism(128)
+
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getCheckpointConfig().enableApproximateLocalRecovery(true);
+
+ env.addSource(new AppSourceFunction(BUFFER_SIZE,
env.getMaxParallelism(), keyByChannelNumber))
+ .slotSharingGroup("source")
+ .keyBy(1)
+ .map(new FailingMapper<>(failAfterElements))
+ .setParallelism(keyByChannelNumber)
+ .slotSharingGroup("map")
+ .addSink(new ValidatingAtMostOnceSink(200,
keyByChannelNumber))
+ .slotSharingGroup("sink");
+
+ FailingMapper.failedBefore = false;
+ tryExecute(env, "testTwoMapTasks");
+ }
+
+ // Schema: (index, key, assignedChannel, long string).
+ private static class AppSourceFunction extends
RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private final String shortString = "I am a very long string to
test partial records hohoho hahaha ";
+ private final String longOrShortString;
+ private final int maxParallelism;
+ private final int numberOfChannels;
+ private final int[] keys;
+ private int index = 0;
+ private volatile boolean running = true;
+
+ // short-length string
+ AppSourceFunction() {
+ this.longOrShortString = shortString;
+ this.maxParallelism = 128;
+ this.numberOfChannels = 1;
+ this.keys = initKeys(numberOfChannels);
+ }
+
+ // long-length string
+ AppSourceFunction(int bufferSize, int maxParallelism, int
numberOfChannels) {
+ this.maxParallelism = maxParallelism;
+ this.numberOfChannels = numberOfChannels;
+ this.keys = initKeys(numberOfChannels);
+
+ StringBuilder builder = new StringBuilder(shortString);
+ for (int i = 0; i <= 2 * bufferSize /
shortString.length() + 1; i++) {
+ builder.append(shortString);
+ }
+ this.longOrShortString = builder.toString();
+ }
+
+ @Override
+ public void run(SourceContext<Tuple4<Integer, Integer, Integer,
String>> ctx) throws Exception{
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ if (index % 100 == 0) {
+ Thread.sleep(50);
+ }
+ int key = keys[index %
numberOfChannels];
+ ctx.collect(new Tuple4<>(index, key,
assignedIndex(key), longOrShortString));
+ }
+ index++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ private int[] initKeys(int numberOfChannels) {
+ int[] keys = new int[numberOfChannels];
+
+ for (int i = 0; i < numberOfChannels; i++) {
+ int key = 0;
+ while (key < 1000 && assignedIndex(key) != i) {
+ key++;
+ }
+ assert key < 1000 : "Can not find a key within
number 1000";
+ keys[i] = key;
+ }
+
+ return keys;
+ }
+
+ private int assignedIndex(int key) {
+ return
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism,
numberOfChannels);
+ }
+ }
+
+ private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 6334389850158703L;
+
+ public static volatile boolean failedBefore;
+
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+
+ public FailingMapper(int failCount) {
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() ==
0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!failedBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ failedBefore = true;
+ throw new Exception("Artificial Test
Failure");
+ }
+ }
+
+ return value;
+ }
+ }
+
+ private static class ValidatingAtMostOnceSink extends
RichSinkFunction<Tuple4<Integer, Integer, Integer, String>> {
+ private static final long serialVersionUID =
1748426382527469932L;
+ private final int numElementsTotal;
+ private final int[] numElements;
+ private final Integer[] indexReachingNumElements;
+ private final int numberOfInputChannels;
+
+ public ValidatingAtMostOnceSink(int numElementsTotal, int
numberOfInputChannels) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = numberOfInputChannels;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ public ValidatingAtMostOnceSink(int numElementsTotal) {
+ this.numElementsTotal = numElementsTotal;
+ this.numberOfInputChannels = 1;
+ this.numElements = new int[numberOfInputChannels];
+ this.indexReachingNumElements = new
Integer[numberOfInputChannels];
+ }
+
+ @Override
+ public void invoke(Tuple4<Integer, Integer, Integer, String>
value) throws Exception {
Review comment:
This overrides a deprecated method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]