[FLINK-2671] [tests] Fix test instability in StreamCheckpointNotifierITCase

Also add more logging, to help future test debugging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e95f2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e95f2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e95f2c

Branch: refs/heads/release-1.0
Commit: 43e95f2c2b710a7c6a0af5df0c9eeb57ca52a4cb
Parents: f905503
Author: Stephan Ewen <[email protected]>
Authored: Fri Mar 11 15:30:32 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Mar 11 15:55:50 2016 +0100

----------------------------------------------------------------------
 .../StreamCheckpointNotifierITCase.java         | 29 ++++++++++----------
 1 file changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43e95f2c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 8927f43..fc9bb7e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -37,9 +37,12 @@ import 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -70,7 +73,9 @@ import static org.junit.Assert.fail;
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase {
+public class StreamCheckpointNotifierITCase extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
        
        private static final int NUM_TASK_MANAGERS = 2;
        private static final int NUM_TASK_SLOTS = 3;
@@ -393,28 +398,23 @@ public class StreamCheckpointNotifierITCase {
 
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
                
-               private final long numElements;
-
-               private long failurePos;
-               private long count;
+               private final long failurePos;
+               
+               private volatile long count;
 
                private volatile boolean notificationAlready;
                
                OnceFailingReducer(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       long failurePosMin = (long) (0.4 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-                       long failurePosMax = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-
-                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
+                       this.failurePos = (long) (0.5 * numElements / 
PARALLELISM);
                }
 
                @Override
                public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> 
value2) {
                        count++;
+                       if (count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                               LOG.info(">>>>>>>>>>>>>>>>> Reached failing 
position <<<<<<<<<<<<<<<<<<<<<");
+                       }
+                       
                        value1.f0 += value2.f0;
                        return value1;
                }
@@ -422,6 +422,7 @@ public class StreamCheckpointNotifierITCase {
                @Override
                public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
                        if (!hasFailed && count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                               LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception 
<<<<<<<<<<<<<<<<<<<<<");
                                hasFailed = true;
                                failureCheckpointID = checkpointId;
                                throw new Exception("Test Failure");

Reply via email to