[FLINK-2671] [tests] Fix test instability in StreamCheckpointNotifierITCase
Also add more logging, to help future test debugging Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e95f2c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e95f2c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e95f2c Branch: refs/heads/release-1.0 Commit: 43e95f2c2b710a7c6a0af5df0c9eeb57ca52a4cb Parents: f905503 Author: Stephan Ewen <[email protected]> Authored: Fri Mar 11 15:30:32 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Mar 11 15:55:50 2016 +0100 ---------------------------------------------------------------------- .../StreamCheckpointNotifierITCase.java | 29 ++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/43e95f2c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 8927f43..fc9bb7e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -37,9 +37,12 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -70,7 +73,9 @@ import static org.junit.Assert.fail; * successfully completed checkpoint. */ @SuppressWarnings("serial") -public class StreamCheckpointNotifierITCase { +public class StreamCheckpointNotifierITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class); private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_TASK_SLOTS = 3; @@ -393,28 +398,23 @@ public class StreamCheckpointNotifierITCase { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); - private final long numElements; - - private long failurePos; - private long count; + private final long failurePos; + + private volatile long count; private volatile boolean notificationAlready; OnceFailingReducer(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(Configuration parameters) { - long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + this.failurePos = (long) (0.5 * numElements / PARALLELISM); } @Override public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) { count++; + if (count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { + LOG.info(">>>>>>>>>>>>>>>>> Reached failing position <<<<<<<<<<<<<<<<<<<<<"); + } + value1.f0 += value2.f0; return value1; } @@ -422,6 +422,7 @@ public class StreamCheckpointNotifierITCase { @Override public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { + LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<"); hasFailed = true; failureCheckpointID = checkpointId; throw new Exception("Test Failure");
