Repository: flink Updated Branches: refs/heads/master c9e0761de -> 3bbe59c0e
[FLINK-2671] [tests] Fix test instability in StreamCheckpointNotifierITCase Also add more logging, to help future test debugging Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bbe59c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bbe59c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bbe59c0 Branch: refs/heads/master Commit: 3bbe59c0ecf71c2c3c4b2c7135c84a666e876f24 Parents: c9e0761 Author: Stephan Ewen <[email protected]> Authored: Fri Mar 11 15:30:32 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Mar 11 15:36:52 2016 +0100 ---------------------------------------------------------------------- .../StreamCheckpointNotifierITCase.java | 27 ++++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3bbe59c0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 9b14a96..cf15052 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -41,13 +41,14 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertFalse; @@ -72,6 +73,8 @@ import static org.junit.Assert.fail; */ @SuppressWarnings("serial") public class StreamCheckpointNotifierITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class); private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_TASK_SLOTS = 3; @@ -394,28 +397,23 @@ public class StreamCheckpointNotifierITCase extends TestLogger { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); - private final long numElements; - - private long failurePos; - private long count; + private final long failurePos; + + private volatile long count; private volatile boolean notificationAlready; OnceFailingReducer(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(Configuration parameters) { - long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + this.failurePos = (long) (0.5 * numElements / PARALLELISM); } @Override public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) { count++; + if (count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { + LOG.info(">>>>>>>>>>>>>>>>> Reached failing position <<<<<<<<<<<<<<<<<<<<<"); + } + value1.f0 += value2.f0; return value1; } @@ -423,6 +421,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger { @Override public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { + LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<"); hasFailed = true; failureCheckpointID = checkpointId; throw new Exception("Test Failure");
