Repository: flink
Updated Branches:
  refs/heads/master dfaec3370 -> f59de67d9


[FLINK-7240] [tests] Stabilize ExternalizedCheckpointITCase

The problem was that the TestingCluster did not wait properly after canceling 
the
job that the job was also completely removed from the cluster before submitting
the next job. This could lead to a NoResourceAvailableException which ultimately
made the job fail.

Add CountDownLatch to source of ExternalizedCheckpointITCase in order to wait 
for all sources to be running before triggering a checkpoint

This closes #4497.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9db6fe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9db6fe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9db6fe1

Branch: refs/heads/master
Commit: f9db6fe1dd0c82209db9c064c8f6f1aa99b9590c
Parents: dfaec33
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Aug 8 10:04:34 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Aug 10 11:17:03 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/testingUtils/TestingCluster.scala   | 36 +++++++++++++-------
 .../ExternalizedCheckpointITCase.java           | 36 +++++++++++++++++---
 3 files changed, 58 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9db6fe1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index fc668d6..7adf456 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -575,11 +575,11 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
           // execute the cancellation asynchronously
+          val origSender = sender
           Future {
             executionGraph.cancel()
+            origSender ! decorateMessage(CancellationSuccess(jobID))
           }(context.dispatcher)
-
-          sender ! decorateMessage(CancellationSuccess(jobID))
         case None =>
           log.info(s"No job found with ID $jobID.")
           sender ! decorateMessage(

http://git-wip-us.apache.org/repos/asf/flink/blob/f9db6fe1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 833cb61..e5655bb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{CheckpointRequest,
 CheckpointRequestFailure, CheckpointRequestSuccess, ResponseSavepoint}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
@@ -378,34 +378,46 @@ class TestingCluster(
   def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = {
     val jobManagerGateway = getLeaderGateway(timeout)
 
+    // wait until the cluster is ready to take a checkpoint.
+    val allRunning = jobManagerGateway.ask(
+      TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout)
+
+    Await.ready(allRunning, timeout)
+
     // trigger checkpoint
     val result = Await.result(
       jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), 
timeout)
 
     result match {
       case success: CheckpointRequestSuccess => success.path
-      case fail: CheckpointRequestFailure => {
-        // TODO right now, this is a dirty way to detect whether the checkpoint
-        // failed because tasks were not ready.This would not be required if
-        // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works
-        // properly.
-        LOG.info("Test checkpoint attempt failed. Retry ...", fail.cause)
-        Thread.sleep(50)
-        requestCheckpoint(jobId, options)
-      }
+      case fail: CheckpointRequestFailure => throw fail.cause
       case _ => throw new IllegalStateException("Trigger checkpoint failed")
     }
   }
 
+  /**
+    * This cancels the given job and waits until it has been completely 
removed from
+    * the cluster.
+    *
+    * @param jobId identifying the job to cancel
+    * @throws Exception if something goes wrong
+    */
   @throws[Exception]
   def cancelJob(jobId: JobID): Unit = {
     if (getCurrentlyRunningJobsJava.contains(jobId)) {
       val jobManagerGateway = getLeaderGateway(timeout)
+      val jobRemoved = jobManagerGateway.ask(NotifyWhenJobRemoved(jobId), 
timeout)
       val cancelFuture = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout)
       val result = Await.result(cancelFuture, timeout)
-      if (!result.isInstanceOf[JobManagerMessages.CancellationSuccess]) {
-        throw new Exception("Cancellation failed")
+
+      result match {
+        case CancellationFailure(_, cause) =>
+          throw new Exception("Cancellation failed", cause)
+        case _ => // noop
       }
+
+      // wait until the job has been removed
+      Await.result(jobRemoved, timeout)
     }
     else throw new IllegalStateException("Job is not running")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9db6fe1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
index e341741..f196c50 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.ClassRule;
@@ -47,6 +48,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * IT case for externalized checkpoints with {@link 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore}
@@ -54,7 +56,7 @@ import java.io.File;
  *
  * <p>This tests considers full and incremental checkpoints and was introduced 
to guard against problems like FLINK-6964.
  */
-public class ExternalizedCheckpointITCase {
+public class ExternalizedCheckpointITCase extends TestLogger {
 
        private static final int PARALLELISM = 2;
        private static final int NUM_TASK_MANAGERS = 2;
@@ -177,7 +179,10 @@ public class ExternalizedCheckpointITCase {
                                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                                env.setParallelism(PARALLELISM);
 
-                               env.addSource(new 
ManualWindowSpeedITCase.InfiniteTupleSource(10_000))
+                               // initialize count down latch
+                               NotifyingInfiniteTupleSource.countDownLatch = 
new CountDownLatch(PARALLELISM);
+
+                               env.addSource(new 
NotifyingInfiniteTupleSource(10_000))
                                        .keyBy(0)
                                        .timeWindow(Time.seconds(3))
                                        .reduce(new 
ReduceFunction<Tuple2<String, Integer>>() {
@@ -212,8 +217,8 @@ public class ExternalizedCheckpointITCase {
                                config.addAll(jobGraph.getJobConfiguration());
                                JobSubmissionResult submissionResult = 
cluster.submitJobDetached(jobGraph);
 
-                               // let the job do some progress
-                               Thread.sleep(200);
+                               // wait until all sources have been started
+                               
NotifyingInfiniteTupleSource.countDownLatch.await();
 
                                externalCheckpoint =
                                        
cluster.requestCheckpoint(submissionResult.getJobID(), 
CheckpointOptions.forFullCheckpoint());
@@ -225,4 +230,27 @@ public class ExternalizedCheckpointITCase {
                        cluster.awaitTermination();
                }
        }
+
+       /**
+        * Infinite source which notifies when all of its sub tasks have been 
started via the count down latch.
+        */
+       public static class NotifyingInfiniteTupleSource extends 
ManualWindowSpeedITCase.InfiniteTupleSource {
+
+               private static final long serialVersionUID = 
8120981235081181746L;
+
+               private static CountDownLatch countDownLatch;
+
+               public NotifyingInfiniteTupleSource(int numKeys) {
+                       super(numKeys);
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<String, Integer>> out) 
throws Exception {
+                       if (countDownLatch != null) {
+                               countDownLatch.countDown();
+                       }
+
+                       super.run(out);
+               }
+       }
 }

Reply via email to