tillrohrmann commented on a change in pull request #13880: URL: https://github.com/apache/flink/pull/13880#discussion_r516735965
########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { Review comment: `extends TestLogger` is missing. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ########## @@ -717,6 +717,11 @@ private static boolean isPointwisePartitioner(StreamPartitioner<?> partitioner) } private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> partitioner) { + + if (streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()) { + return ResultPartitionType.PIPELINED_APPROXIMATE; + } Review comment: Should this be integrated with the `GlobalDataExchangeMode` and the `shuffleMode` as well? Otherwise we need to document that approximate local recovery disables the global data exchange mode setting and only works if `shuffleMode` is `UNDEFINED`. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); Review comment: Why is this required? Per default we will have the pipelined region scheduler activated. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); Review comment: This could be factored out into a `createConfig` method and then we could use the `@Rule` for the `cluster` field and don't have to manually start and stop it. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ Review comment: Not sure whether this should go into the JavaDocs of the testing class. Usually this JavaDoc says what you are testing here. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass Review comment: I think this is problematic because we start for every test case a new `cluster` but only shut the last one down after all tests are completed. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); Review comment: Why is this needed? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java ########## @@ -70,7 +70,7 @@ vertexToRegion.put(vertex, currentRegion); for (R consumedResult : vertex.getConsumedResults()) { - if (consumedResult.getResultType().isPipelined()) { + if (!consumedResult.getResultType().isReconnectable()) { Review comment: Maybe add a comment explaining why it is `isReconnectable` now. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/1) -----> (sink1/1) + * </pre> + * (map1/1) fails, (map1/1) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryThreeTasks() throws Exception { + final int failAfterElements = 150; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .setMaxParallelism(128) + .disableOperatorChaining() + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction()) + .slotSharingGroup("source") + .map(new FailingMapper<>(failAfterElements)) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(300)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testThreeTasks"); + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/2) -----> (sink1/1) + * | ^ + * -------------> (map2/2) ---------| + * </pre> + * (map1/2) fails, (map1/2) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryTwoMapTasks() throws Exception { + final int failAfterElements = 100; + final int keyByChannelNumber = 2; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .disableOperatorChaining() + .setMaxParallelism(128) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction(BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber)) + .slotSharingGroup("source") + .keyBy(0) + .map(new FailingMapper<>(failAfterElements)) + .setParallelism(keyByChannelNumber) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testTwoMapTasks"); + } + + // Schema: (key, timestamp, index, long string). + private static class AppSourceFunction extends RichParallelSourceFunction<Tuple4<Integer, Long, Integer, String>> { + private final String longOrShortString; + private final int maxParallelism; + private final int numberOfChannels; + + private int index = 0; + private volatile boolean running = true; + + // short-length string + AppSourceFunction() { + this.longOrShortString = "I am a very long string to test partial records hohoho hahaha "; + this.maxParallelism = 128; + this.numberOfChannels = 1; + } + + // long-length string + AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) { + this.maxParallelism = maxParallelism; + this.numberOfChannels = numberOfChannels; + + String shortString = "I am a very long string to test partial records hohoho hahaha "; Review comment: can be deduplicated ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/1) -----> (sink1/1) + * </pre> + * (map1/1) fails, (map1/1) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryThreeTasks() throws Exception { + final int failAfterElements = 150; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .setMaxParallelism(128) + .disableOperatorChaining() + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction()) + .slotSharingGroup("source") + .map(new FailingMapper<>(failAfterElements)) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(300)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testThreeTasks"); + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/2) -----> (sink1/1) + * | ^ + * -------------> (map2/2) ---------| + * </pre> + * (map1/2) fails, (map1/2) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryTwoMapTasks() throws Exception { Review comment: Same here. The test does not seem to depend on `env.getCheckpointConfig().enableApproximateLocalRecovery(true)`. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/1) -----> (sink1/1) + * </pre> + * (map1/1) fails, (map1/1) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryThreeTasks() throws Exception { + final int failAfterElements = 150; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .setMaxParallelism(128) + .disableOperatorChaining() + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction()) + .slotSharingGroup("source") + .map(new FailingMapper<>(failAfterElements)) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(300)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testThreeTasks"); + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/2) -----> (sink1/1) + * | ^ + * -------------> (map2/2) ---------| + * </pre> + * (map1/2) fails, (map1/2) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryTwoMapTasks() throws Exception { + final int failAfterElements = 100; + final int keyByChannelNumber = 2; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .disableOperatorChaining() + .setMaxParallelism(128) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction(BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber)) + .slotSharingGroup("source") + .keyBy(0) + .map(new FailingMapper<>(failAfterElements)) + .setParallelism(keyByChannelNumber) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testTwoMapTasks"); + } + + // Schema: (key, timestamp, index, long string). + private static class AppSourceFunction extends RichParallelSourceFunction<Tuple4<Integer, Long, Integer, String>> { + private final String longOrShortString; + private final int maxParallelism; + private final int numberOfChannels; + + private int index = 0; + private volatile boolean running = true; + + // short-length string + AppSourceFunction() { + this.longOrShortString = "I am a very long string to test partial records hohoho hahaha "; + this.maxParallelism = 128; + this.numberOfChannels = 1; + } + + // long-length string + AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) { + this.maxParallelism = maxParallelism; + this.numberOfChannels = numberOfChannels; + + String shortString = "I am a very long string to test partial records hohoho hahaha "; + StringBuilder builder = new StringBuilder(shortString); + + for (int i = 0; i <= 2 * bufferSize / shortString.length() + 1; i++) { + builder.append(shortString); + } + this.longOrShortString = builder.toString(); + } + + @Override + public void run(SourceContext<Tuple4<Integer, Long, Integer, String>> ctx) throws Exception{ + long timestamp = 1593575900000L; + while (running) { + synchronized (ctx.getCheckpointLock()) { + if (index % 100 == 0) { + Thread.sleep(500); + } + ctx.collect(new Tuple4<>(index, timestamp++, channelIndex(index), longOrShortString)); + } + index++; + } + } + + @Override + public void cancel() { + running = false; + } + + private int channelIndex(int key) { + return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); + } + } + + private static class FailingMapper<T> extends RichMapFunction<T, T> { + private static final long serialVersionUID = 6334389850158703L; + + public static volatile boolean failedBefore; + + private final int failCount; + private int numElementsTotal; + + private boolean failer; + + public FailingMapper(int failCount) { + this.failCount = failCount; + } + + @Override + public void open(Configuration parameters) { + failer = getRuntimeContext().getIndexOfThisSubtask() == 0; + } + + @Override + public T map(T value) throws Exception { + numElementsTotal++; + + if (!failedBefore) { + Thread.sleep(10); + + if (failer && numElementsTotal >= failCount) { + failedBefore = true; + throw new Exception("Artificial Test Failure"); + } + } + + return value; + } + } + + private static class ValidatingAtMostOnceSink extends RichSinkFunction<Tuple4<Integer, Long, Integer, String>> { + + private static final long serialVersionUID = 1748426382527469932L; + private final int numElementsTotal; + private final BitSet duplicateChecker = new BitSet(); + private final int[] numElements; + private final int numberOfInputChannels; + + public ValidatingAtMostOnceSink(int numElementsTotal, int numberOfInputChannels) { + this.numElementsTotal = numElementsTotal; + this.numberOfInputChannels = numberOfInputChannels; + this.numElements = new int[numberOfInputChannels]; + } + + public ValidatingAtMostOnceSink(int numElementsTotal) { + this.numElementsTotal = numElementsTotal; + this.numberOfInputChannels = 1; + this.numElements = new int[numberOfInputChannels]; + } + + @Override + public void invoke(Tuple4<Integer, Long, Integer, String> value) throws Exception { + assert value.f2 < numberOfInputChannels; + numElements[value.f2]++; + + if (duplicateChecker.get(value.f0)) { + throw new Exception("Received a duplicate: " + value); + } Review comment: How will the `duplicateChecker` work if the task gets restarted? Won't this effectively mean that we are checking that we don't send duplicate message during normal operation? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); Review comment: Why is an ask timeout of 1h needed? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/1) -----> (sink1/1) + * </pre> + * (map1/1) fails, (map1/1) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryThreeTasks() throws Exception { + final int failAfterElements = 150; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .setMaxParallelism(128) + .disableOperatorChaining() + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction()) + .slotSharingGroup("source") + .map(new FailingMapper<>(failAfterElements)) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(300)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testThreeTasks"); + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/2) -----> (sink1/1) + * | ^ + * -------------> (map2/2) ---------| + * </pre> + * (map1/2) fails, (map1/2) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryTwoMapTasks() throws Exception { + final int failAfterElements = 100; + final int keyByChannelNumber = 2; + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env + .setParallelism(1) + .setBufferTimeout(0) + .disableOperatorChaining() + .setMaxParallelism(128) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getCheckpointConfig().enableApproximateLocalRecovery(true); + + env.addSource(new AppSourceFunction(BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber)) + .slotSharingGroup("source") + .keyBy(0) + .map(new FailingMapper<>(failAfterElements)) + .setParallelism(keyByChannelNumber) + .slotSharingGroup("map") + .addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber)) + .slotSharingGroup("sink"); + + FailingMapper.failedBefore = false; + tryExecute(env, "testTwoMapTasks"); + } + + // Schema: (key, timestamp, index, long string). + private static class AppSourceFunction extends RichParallelSourceFunction<Tuple4<Integer, Long, Integer, String>> { + private final String longOrShortString; + private final int maxParallelism; + private final int numberOfChannels; + + private int index = 0; + private volatile boolean running = true; + + // short-length string + AppSourceFunction() { + this.longOrShortString = "I am a very long string to test partial records hohoho hahaha "; + this.maxParallelism = 128; + this.numberOfChannels = 1; + } + + // long-length string + AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) { + this.maxParallelism = maxParallelism; + this.numberOfChannels = numberOfChannels; + + String shortString = "I am a very long string to test partial records hohoho hahaha "; + StringBuilder builder = new StringBuilder(shortString); + + for (int i = 0; i <= 2 * bufferSize / shortString.length() + 1; i++) { + builder.append(shortString); + } + this.longOrShortString = builder.toString(); + } + + @Override + public void run(SourceContext<Tuple4<Integer, Long, Integer, String>> ctx) throws Exception{ + long timestamp = 1593575900000L; + while (running) { + synchronized (ctx.getCheckpointLock()) { + if (index % 100 == 0) { + Thread.sleep(500); Review comment: Do we need to sleep that long here? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.SuccessException; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** Use RemoteInputChannel. Only for Upstream Reconnection. **/ + +public class ApproximateLocalRecoveryDownstreamITCase { + private static MiniClusterWithClientResource cluster; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy"); + config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName()); + + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE))); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); + config.setString(AkkaOptions.ASK_TIMEOUT, "1 h"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Test the following topology. + * <pre> + * (source1/1) -----> (map1/1) -----> (sink1/1) + * </pre> + * (map1/1) fails, (map1/1) and (sink1/1) restart + */ + @Test + public void localTaskFailureRecoveryThreeTasks() throws Exception { Review comment: What is the purpose of this test case? Is it to check that Flink runs when `env.getCheckpointConfig().enableApproximateLocalRecovery(true)`? The test does not seem to test anything specific about approximate local recovery as it still passes if I set `env.getCheckpointConfig().enableApproximateLocalRecovery(false)`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java ########## @@ -63,7 +63,7 @@ public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPar Preconditions.checkNotNull(resultPartitionDeploymentDescriptor); // only blocking partitions require explicit release call Review comment: Please update comment to be aligned with the if condition. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
