Repository: flink
Updated Branches:
  refs/heads/release-1.2 d3f2fe262 -> a2853ec15


[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport

Akka's AskSupport trait requires that failures are wrapped in a 
akka.actor.Status.Failure
to be recognized. Internally the trait will unwrap the failure and wrap it in a
scala.util.Failure instance. However, it does not recognize the scala Failure 
when given
to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
scala.util.Success instance.

This closes #3324.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2853ec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2853ec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2853ec1

Branch: refs/heads/release-1.2
Commit: a2853ec1527dd848d635d9cb7720b2bd21c7e3aa
Parents: d3f2fe2
Author: Till Rohrmann <[email protected]>
Authored: Wed Feb 15 14:16:26 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Feb 16 17:11:32 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  21 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 237 ++++++++++++++++++-
 2 files changed, 241 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f1e74cd..37b5e04 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -70,13 +70,12 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.{MathUtils, NetUtils}
+import org.apache.flink.util.MathUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a 
Flink job. It is
@@ -429,7 +428,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly 
modify state
                 //            but only send messages to the TaskManager to do 
those changes
-                case Success(result) =>
+                case scala.util.Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -438,7 +437,7 @@ class TaskManager(
                     )
                   }
 
-                case Failure(t) =>
+                case scala.util.Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -476,7 +475,7 @@ class TaskManager(
               sender ! decorateMessage(Acknowledge.get())
             } catch {
               case t: Throwable =>
-                sender ! decorateMessage(Failure(t))
+                sender ! decorateMessage(Status.Failure(t))
             }
           } else {
             log.debug(s"Cannot find task to stop for execution 
${executionID})")
@@ -760,8 +759,6 @@ class TaskManager(
                   // ---- Done ----
                   log.debug(s"Done with stack trace sample $sampleId.")
 
-
-
                   sender ! new StackTraceSampleResponse(sampleId, executionId, 
currentTraces)
                 }
 
@@ -779,7 +776,7 @@ class TaskManager(
           }
         } catch {
           case e: Exception =>
-            sender ! Failure(e)
+            sender ! decorateMessage(Status.Failure(e))
         }
 
       case _ => unhandled(message)
@@ -839,10 +836,10 @@ class TaskManager(
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
-              case Success(value) =>
+              case scala.util.Success(value) =>
                 sender ! value
                 fis.close()
-              case Failure(e) =>
+              case scala.util.Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1203,7 +1200,7 @@ class TaskManager(
     catch {
       case t: Throwable =>
         log.error("SubmitTask failed", t)
-        sender ! decorateMessage(Failure(t))
+        sender ! decorateMessage(Status.Failure(t))
     }
   }
 
@@ -1257,7 +1254,7 @@ class TaskManager(
         if (errors.isEmpty) {
           sender ! decorateMessage(Acknowledge.get())
         } else {
-          sender ! decorateMessage(Failure(new 
Exception(errors.mkString("\n"))))
+          sender ! decorateMessage(Status.Failure(new 
Exception(errors.mkString("\n"))))
         }
 
       case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2fb5fa8..ef9e4bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -481,7 +481,7 @@ public class TaskManagerTest extends TestLogger {
                                                        
expectMsgEquals(Acknowledge.get());
 
                                                        tm.tell(new 
StopTask(eid2), testActorGateway);
-                                                       
expectMsgClass(Failure.class);
+                                                       
expectMsgClass(Status.Failure.class);
 
                                                        
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 
@@ -1225,13 +1225,13 @@ public class TaskManagerTest extends TestLogger {
 
                                                        // Receive the expected 
message (heartbeat races possible)
                                                        Object[] msg = 
receiveN(1);
-                                                       while (!(msg[0] 
instanceof Failure)) {
+                                                       while (!(msg[0] 
instanceof Status.Failure)) {
                                                                msg = 
receiveN(1);
                                                        }
 
-                                                       Failure response = 
(Failure) msg[0];
+                                                       Status.Failure response 
= (Status.Failure) msg[0];
 
-                                                       
assertEquals(IllegalStateException.class, response.exception().getClass());
+                                                       
assertEquals(IllegalStateException.class, response.cause().getClass());
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                        fail(e.getMessage());
@@ -1523,7 +1523,234 @@ public class TaskManagerTest extends TestLogger {
                        }
                }};
        }
-       
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the submit task
+        * message fails.
+        */
+       @Test
+       public void testSubmitTaskFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               new ExecutionAttemptID(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               0, // this will make the submission fail 
because the number of key groups must be >= 1
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               "Foobar",
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       try {
+                               Await.result(submitResponse, timeout);
+
+                               fail("The submit task message should have 
failed.");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the stop task
+        * message fails.
+        */
+       @Test
+       public void testStopTaskFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               executionAttemptId,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               1,
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               Tasks.BlockingNoOpInvokable.class.getName(),
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       Await.result(submitResponse, timeout);
+
+                       Future<Object> stopResponse = taskManager.ask(new 
StopTask(executionAttemptId), timeout);
+
+                       try {
+                               Await.result(stopResponse, timeout);
+
+                               fail("The stop task message should have 
failed.");
+                       } catch (UnsupportedOperationException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
+        * trace message fails.
+        */
+       @Test
+       public void testStackTraceSampleFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       Future<Object> stackTraceResponse = taskManager.ask(
+                               new TriggerStackTraceSample(
+                                       0,
+                                       new ExecutionAttemptID(),
+                                       0,
+                                       Time.milliseconds(1L),
+                                       0),
+                               timeout);
+
+                       try {
+                               Await.result(stackTraceResponse, timeout);
+
+                               fail("The trigger stack trace message should 
have failed.");
+                       } catch (IllegalStateException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
+        * trace message fails.
+        */
+       @Test
+       public void testUpdateTaskInputPartitionsFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               executionAttemptId,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               1,
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               Tasks.BlockingNoOpInvokable.class.getName(),
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       Await.result(submitResponse, timeout);
+
+                       Future<Object> partitionUpdateResponse = 
taskManager.ask(
+                               new TaskMessages.UpdateTaskSinglePartitionInfo(
+                                       executionAttemptId,
+                                       new IntermediateDataSetID(),
+                                       new 
InputChannelDeploymentDescriptor(new ResultPartitionID(), 
ResultPartitionLocation.createLocal())),
+                               timeout);
+
+                       try {
+                               Await.result(partitionUpdateResponse, timeout);
+
+                               fail("The update task input partitions message 
should have failed.");
+                       } catch (Exception e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public static class SimpleJobManager extends FlinkUntypedActor {

Reply via email to