Repository: flink Updated Branches: refs/heads/master 98fad04ec -> c0fd36bac
[FLINK-2671] [tests] Fix unstable StreamCheckpointNotifierITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0fd36ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0fd36ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0fd36ba Branch: refs/heads/master Commit: c0fd36bac8955cff58a88e72d5bdb378f3cfdf93 Parents: a8be52a Author: Stephan Ewen <se...@apache.org> Authored: Fri Jan 15 17:29:46 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jan 15 18:58:57 2016 +0100 ---------------------------------------------------------------------- .../api/functions/sink/DiscardingSink.java | 32 ++ .../StreamCheckpointNotifierITCase.java | 402 +++++++++++-------- 2 files changed, 276 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c0fd36ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java new file mode 100644 index 0000000..3bbb14b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +/** + * A stream sink that ignores all elements. + * + * @param <T> The type of elements received by the sink. + */ +public class DiscardingSink<T> implements SinkFunction<T> { + + private static final long serialVersionUID = 1L; + + @Override + public void invoke(T value) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0fd36ba/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 08af93a..9de5794 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -22,31 +22,41 @@ import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Integration test for the {@link CheckpointNotifier} interface. The test ensures that - * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for some completed + * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for completed * checkpoints, that it is called at most once for any checkpoint id and that it is not * called for a deliberately failed checkpoint. * @@ -60,9 +70,43 @@ import static org.junit.Assert.assertTrue; * successfully completed checkpoint. */ @SuppressWarnings("serial") -public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase { +public class StreamCheckpointNotifierITCase { + + private static final int NUM_TASK_MANAGERS = 2; + private static final int NUM_TASK_SLOTS = 3; + private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void startCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + + cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to start test cluster: " + e.getMessage()); + } + } - final long NUM_LONGS = 10_000_000L; + @AfterClass + public static void stopCluster() { + try { + cluster.stop(); + cluster = null; + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to stop test cluster: " + e.getMessage()); + } + } /** * Runs the following program: @@ -71,61 +115,83 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase * [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ] * </pre> */ - @Override - public void testProgram(StreamExecutionEnvironment env) { - - DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(NUM_LONGS)); - - stream - // -------------- first vertex, chained to the src ---------------- - .filter(new LongRichFilterFunction()) - - // -------------- second vertex, applying the co-map ---------------- - .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction()) - - // -------------- third vertex - the stateful one that also fails ---------------- - .map(new IdentityMapFunction()) - .startNewChain() - - // -------------- fourth vertex - reducer and the sink ---------------- - .keyBy(0) - .reduce(new OnceFailingReducer(NUM_LONGS)) - .addSink(new SinkFunction<Tuple1<Long>>() { - @Override - public void invoke(Tuple1<Long> value) { - // do nothing - } - }); - } + @Test + public void testProgram() { + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.enableCheckpointing(500); + env.getConfig().disableSysoutLogging(); + + final int numElements = 10000; + final int numTaskTotal = PARALLELISM * 5; + + DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal)); + + stream + // -------------- first vertex, chained to the src ---------------- + .filter(new LongRichFilterFunction()) + + // -------------- second vertex, applying the co-map ---------------- + .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction()) + + // -------------- third vertex - the stateful one that also fails ---------------- + .map(new IdentityMapFunction()) + .startNewChain() + + // -------------- fourth vertex - reducer and the sink ---------------- + .keyBy(0) + .reduce(new OnceFailingReducer(numElements)) + + .addSink(new DiscardingSink<Tuple1<Long>>()); + + env.execute(); - @Override - public void postSubmit() { - @SuppressWarnings({"unchecked", "rawtypes"}) - List<Long>[][] checkList = new List[][] { + final long failureCheckpointID = OnceFailingReducer.failureCheckpointID; + assertNotEquals(0L, failureCheckpointID); + + List<List<Long>[]> allLists = Arrays.asList( GeneratingSourceFunction.completedCheckpoints, - IdentityMapFunction.completedCheckpoints, LongRichFilterFunction.completedCheckpoints, - LeftIdentityCoRichFlatMapFunction.completedCheckpoints}; - - long failureCheckpointID = OnceFailingReducer.failureCheckpointID; + LeftIdentityCoRichFlatMapFunction.completedCheckpoints, + IdentityMapFunction.completedCheckpoints, + OnceFailingReducer.completedCheckpoints + ); - for(List<Long>[] parallelNotifications : checkList) { - for (int i = 0; i < PARALLELISM; i++){ - List<Long> notifications = parallelNotifications[i]; - assertTrue("No checkpoint notification was received.", + for (List<Long>[] parallelNotifications : allLists) { + for (List<Long> notifications : parallelNotifications) { + + assertTrue("No checkpoint notification was received.", notifications.size() > 0); - assertFalse("Failure checkpoint was marked as completed.", + + assertFalse("Failure checkpoint was marked as completed.", notifications.contains(failureCheckpointID)); - assertFalse("No checkpoint received before failure.", - notifications.get(0) == failureCheckpointID); - assertFalse("No checkpoint received after failure.", + + assertFalse("No checkpoint received after failure.", notifications.get(notifications.size() - 1) == failureCheckpointID); - assertTrue("Checkpoint notification was received multiple times", + + assertTrue("Checkpoint notification was received multiple times", notifications.size() == new HashSet<Long>(notifications).size()); + } } } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } + static List<Long>[] createCheckpointLists(int parallelism) { + @SuppressWarnings({"unchecked", "rawtypes"}) + List<Long>[] lists = new List[parallelism]; + for (int i = 0; i < parallelism; i++) { + lists[i] = new ArrayList<>(); + } + return lists; + } + // -------------------------------------------------------------------------------------------- // Custom Functions // -------------------------------------------------------------------------------------------- @@ -136,40 +202,35 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase */ private static class GeneratingSourceFunction extends RichSourceFunction<Long> implements ParallelSourceFunction<Long>, CheckpointNotifier, Checkpointed<Integer> { - - @SuppressWarnings({"unchecked", "rawtypes"}) - static List<Long>[] completedCheckpoints = new List[PARALLELISM]; + static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); + + static AtomicLong numPostFailureNotifications = new AtomicLong(); // operator behaviour private final long numElements; - private long result; + + private final int notificationsToWaitFor; private int index; private int step; - // test behaviour - private int subtaskId; - + private volatile boolean notificationAlready; + private volatile boolean isRunning = true; - GeneratingSourceFunction(long numElements) { + GeneratingSourceFunction(long numElements, int notificationsToWaitFor) { this.numElements = numElements; + this.notificationsToWaitFor = notificationsToWaitFor; } @Override public void open(Configuration parameters) throws IOException { step = getRuntimeContext().getNumberOfParallelSubtasks(); - subtaskId = getRuntimeContext().getIndexOfThisSubtask(); - - if (index == 0) { - index = subtaskId; - } - // Create a collection on the first open - if (completedCheckpoints[subtaskId] == null) { - completedCheckpoints[subtaskId] = new ArrayList<>(); - } + // if index has been restored, it is not 0 any more + if (index == 0) + index = getRuntimeContext().getIndexOfThisSubtask(); } @Override @@ -177,14 +238,19 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase final Object lockingObject = ctx.getCheckpointLock(); while (isRunning && index < numElements) { - - result = index % 10; + long result = index % 10; synchronized (lockingObject) { index += step; ctx.collect(result); } } + + // if the program goes fast and no notifications come through, we + // wait until all tasks had a chance to see a notification + while (isRunning && numPostFailureNotifications.get() < notificationsToWaitFor) { + Thread.sleep(50); + } } @Override @@ -193,11 +259,6 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase } @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - completedCheckpoints[subtaskId].add(checkpointId); - } - - @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) { return index; } @@ -206,6 +267,20 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase public void restoreState(Integer state) { index = state; } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // record the ID of the completed checkpoint + int partition = getRuntimeContext().getIndexOfThisSubtask(); + completedCheckpoints[partition].add(checkpointId); + + // if this is the first time we get a notification since the failure, + // tell the source function + if (OnceFailingReducer.hasFailed && !notificationAlready) { + notificationAlready = true; + GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); + } + } } /** @@ -215,10 +290,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>> implements CheckpointNotifier { - @SuppressWarnings({"unchecked", "rawtypes"}) - public static List<Long>[] completedCheckpoints = new List[PARALLELISM]; - - private int subtaskId; + static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); + + private volatile boolean notificationAlready; @Override public Tuple1<Long> map(Long value) throws Exception { @@ -226,35 +300,106 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase } @Override - public void open(Configuration conf) throws IOException { - subtaskId = getRuntimeContext().getIndexOfThisSubtask(); + public void notifyCheckpointComplete(long checkpointId) { + // record the ID of the completed checkpoint + int partition = getRuntimeContext().getIndexOfThisSubtask(); + completedCheckpoints[partition].add(checkpointId); + + // if this is the first time we get a notification since the failure, + // tell the source function + if (OnceFailingReducer.hasFailed && !notificationAlready) { + notificationAlready = true; + GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); + } + } + } - // Create a collection on the first open - if (completedCheckpoints[subtaskId] == null) { - completedCheckpoints[subtaskId] = new ArrayList<>(); + /** + * Filter on Long values supposedly letting all values through. As an implementation + * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids + * it has seen in a static list. + */ + private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointNotifier { + + static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); + + private volatile boolean notificationAlready; + + @Override + public boolean filter(Long value) { + return value < 100; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // record the ID of the completed checkpoint + int partition = getRuntimeContext().getIndexOfThisSubtask(); + completedCheckpoints[partition].add(checkpointId); + + // if this is the first time we get a notification since the failure, + // tell the source function + if (OnceFailingReducer.hasFailed && !notificationAlready) { + notificationAlready = true; + GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + } + /** + * CoFlatMap on Long values as identity transform on the left input, while ignoring the right. + * As an implementation for the {@link CheckpointNotifier} interface it stores all the checkpoint + * ids it has seen in a static list. + */ + private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long> + implements CheckpointNotifier { + + static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); + + private volatile boolean notificationAlready; + @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - completedCheckpoints[subtaskId].add(checkpointId); + public void flatMap1(Long value, Collector<Long> out) { + out.collect(value); + } + + @Override + public void flatMap2(Long value, Collector<Long> out) { + // we ignore the values from the second input + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // record the ID of the completed checkpoint + int partition = getRuntimeContext().getIndexOfThisSubtask(); + completedCheckpoints[partition].add(checkpointId); + + // if this is the first time we get a notification since the failure, + // tell the source function + if (OnceFailingReducer.hasFailed && !notificationAlready) { + notificationAlready = true; + GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); + } } } /** * Reducer that causes one failure between seeing 40% to 70% of the records. */ - private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> implements Checkpointed<Long> { - - private static volatile boolean hasFailed = false; - public static volatile long failureCheckpointID; + private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> + implements Checkpointed<Long>, CheckpointNotifier + { + static volatile boolean hasFailed = false; + static volatile long failureCheckpointID; + static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); + private final long numElements; private long failurePos; private long count; - + private volatile boolean notificationAlready; + OnceFailingReducer(long numElements) { this.numElements = numElements; } @@ -265,11 +410,10 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - count = 0; } @Override - public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) throws Exception { + public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) { count++; value1.f0 += value2.f0; return value1; @@ -277,7 +421,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase @Override public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - if (!hasFailed && count >= failurePos) { + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { hasFailed = true; failureCheckpointID = checkpointId; throw new Exception("Test Failure"); @@ -289,77 +433,19 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase public void restoreState(Long state) { count = state; } - } - - /** - * Filter on Long values supposedly letting all values through. As an implementation - * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids - * it has seen in a static list. - */ - private static class LongRichFilterFunction extends RichFilterFunction<Long> - implements CheckpointNotifier { - - @SuppressWarnings({"unchecked", "rawtypes"}) - static List<Long>[] completedCheckpoints = new List[PARALLELISM]; - - private int subtaskId; - - @Override - public boolean filter(Long value) { - return value < 100; - } @Override - public void open(Configuration conf) throws IOException { - subtaskId = getRuntimeContext().getIndexOfThisSubtask(); - - // Create a collection on the first open - if (completedCheckpoints[subtaskId] == null) { - completedCheckpoints[subtaskId] = new ArrayList<>(); + public void notifyCheckpointComplete(long checkpointId) { + // record the ID of the completed checkpoint + int partition = getRuntimeContext().getIndexOfThisSubtask(); + completedCheckpoints[partition].add(checkpointId); + + // if this is the first time we get a notification since the failure, + // tell the source function + if (OnceFailingReducer.hasFailed && !notificationAlready) { + notificationAlready = true; + GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - completedCheckpoints[subtaskId].add(checkpointId); - } - } - - /** - * CoFlatMap on Long values as identity transform on the left input, while ignoring the right. - * As an implementation for the {@link CheckpointNotifier} interface it stores all the checkpoint - * ids it has seen in a static list. - */ - private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long> - implements CheckpointNotifier { - - @SuppressWarnings({"unchecked", "rawtypes"}) - public static List<Long>[] completedCheckpoints = new List[PARALLELISM]; - private int subtaskId; - - @Override - public void open(Configuration conf) throws IOException { - subtaskId = getRuntimeContext().getIndexOfThisSubtask(); - - // Create a collection on the first open - if (completedCheckpoints[subtaskId] == null) { - completedCheckpoints[subtaskId] = new ArrayList<>(); - } - } - - @Override - public void flatMap1(Long value, Collector<Long> out) throws IOException { - out.collect(value); - } - - @Override - public void flatMap2(Long value, Collector<Long> out) throws IOException { - // we ignore the values from the second input - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - completedCheckpoints[subtaskId].add(checkpointId); - } } }