Repository: flink
Updated Branches:
  refs/heads/master 2ec2abfae -> 413609d13


[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport

Akka's AskSupport trait requires that failures are wrapped in a 
akka.actor.Status.Failure
to be recognized. Internally the trait will unwrap the failure and wrap it in a
scala.util.Failure instance. However, it does not recognize the scala Failure 
when given
to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
scala.util.Success instance.

This closes #3321.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/413609d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/413609d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/413609d1

Branch: refs/heads/master
Commit: 413609d13fcf924fa8581450618bccc6abdbbda0
Parents: 2ec2abf
Author: Till Rohrmann <[email protected]>
Authored: Wed Feb 15 14:16:26 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Feb 16 17:09:59 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  20 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 237 ++++++++++++++++++-
 2 files changed, 240 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7cb1902..a70454b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -69,13 +69,11 @@ import 
org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, 
TaskManagerServicesConfiguration, TaskManagerConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a 
Flink job. It is
@@ -423,7 +421,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly 
modify state
                 //            but only send messages to the TaskManager to do 
those changes
-                case Success(result) =>
+                case scala.util.Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -432,7 +430,7 @@ class TaskManager(
                     )
                   }
 
-                case Failure(t) =>
+                case scala.util.Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -470,7 +468,7 @@ class TaskManager(
               sender ! decorateMessage(Acknowledge.get())
             } catch {
               case t: Throwable =>
-                sender ! decorateMessage(Failure(t))
+                sender ! decorateMessage(Status.Failure(t))
             }
           } else {
             log.debug(s"Cannot find task to stop for execution 
${executionID})")
@@ -762,8 +760,6 @@ class TaskManager(
                   // ---- Done ----
                   log.debug(s"Done with stack trace sample $sampleId.")
 
-
-
                   sender ! new StackTraceSampleResponse(sampleId, executionId, 
currentTraces)
                 }
 
@@ -781,7 +777,7 @@ class TaskManager(
           }
         } catch {
           case e: Exception =>
-            sender ! Failure(e)
+            sender ! decorateMessage(Status.Failure(e))
         }
 
       case _ => unhandled(message)
@@ -841,10 +837,10 @@ class TaskManager(
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
-              case Success(value) =>
+              case scala.util.Success(value) =>
                 sender ! value
                 fis.close()
-              case Failure(e) =>
+              case scala.util.Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1209,7 +1205,7 @@ class TaskManager(
     catch {
       case t: Throwable =>
         log.error("SubmitTask failed", t)
-        sender ! decorateMessage(Failure(t))
+        sender ! decorateMessage(Status.Failure(t))
     }
   }
 
@@ -1263,7 +1259,7 @@ class TaskManager(
         if (errors.isEmpty) {
           sender ! decorateMessage(Acknowledge.get())
         } else {
-          sender ! decorateMessage(Failure(new 
Exception(errors.mkString("\n"))))
+          sender ! decorateMessage(Status.Failure(new 
Exception(errors.mkString("\n"))))
         }
 
       case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 770aa35..356d693 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -483,7 +483,7 @@ public class TaskManagerTest extends TestLogger {
                                                        
expectMsgEquals(Acknowledge.get());
 
                                                        tm.tell(new 
StopTask(eid2), testActorGateway);
-                                                       
expectMsgClass(Failure.class);
+                                                       
expectMsgClass(Status.Failure.class);
 
                                                        
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 
@@ -1227,13 +1227,13 @@ public class TaskManagerTest extends TestLogger {
 
                                                        // Receive the expected 
message (heartbeat races possible)
                                                        Object[] msg = 
receiveN(1);
-                                                       while (!(msg[0] 
instanceof Failure)) {
+                                                       while (!(msg[0] 
instanceof Status.Failure)) {
                                                                msg = 
receiveN(1);
                                                        }
 
-                                                       Failure response = 
(Failure) msg[0];
+                                                       Status.Failure response 
= (Status.Failure) msg[0];
 
-                                                       
assertEquals(IllegalStateException.class, response.exception().getClass());
+                                                       
assertEquals(IllegalStateException.class, response.cause().getClass());
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                        fail(e.getMessage());
@@ -1525,7 +1525,234 @@ public class TaskManagerTest extends TestLogger {
                        }
                }};
        }
-       
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the submit task
+        * message fails.
+        */
+       @Test
+       public void testSubmitTaskFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               new ExecutionAttemptID(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               0, // this will make the submission fail 
because the number of key groups must be >= 1
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               "Foobar",
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       try {
+                               Await.result(submitResponse, timeout);
+
+                               fail("The submit task message should have 
failed.");
+                       } catch (IllegalArgumentException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the stop task
+        * message fails.
+        */
+       @Test
+       public void testStopTaskFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               executionAttemptId,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               1,
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               BlockingNoOpInvokable.class.getName(),
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       Await.result(submitResponse, timeout);
+
+                       Future<Object> stopResponse = taskManager.ask(new 
StopTask(executionAttemptId), timeout);
+
+                       try {
+                               Await.result(stopResponse, timeout);
+
+                               fail("The stop task message should have 
failed.");
+                       } catch (UnsupportedOperationException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
+        * trace message fails.
+        */
+       @Test
+       public void testStackTraceSampleFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       Future<Object> stackTraceResponse = taskManager.ask(
+                               new TriggerStackTraceSample(
+                                       0,
+                                       new ExecutionAttemptID(),
+                                       0,
+                                       Time.milliseconds(1L),
+                                       0),
+                               timeout);
+
+                       try {
+                               Await.result(stackTraceResponse, timeout);
+
+                               fail("The trigger stack trace message should 
have failed.");
+                       } catch (IllegalStateException e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
+       /**
+        * Tests that the TaskManager sends a proper exception back to the 
sender if the trigger stack
+        * trace message fails.
+        */
+       @Test
+       public void testUpdateTaskInputPartitionsFailure() throws Exception {
+               ActorGateway jobManager = null;
+               ActorGateway taskManager = null;
+
+               try {
+
+                       final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+                       taskManager = TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               new Configuration(),
+                               true,
+                               true);
+
+                       TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(
+                               new JobID(),
+                               "test job",
+                               new JobVertexID(),
+                               executionAttemptId,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               "test task",
+                               1,
+                               0,
+                               1,
+                               0,
+                               new Configuration(),
+                               new Configuration(),
+                               BlockingNoOpInvokable.class.getName(),
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               0);
+
+                       Future<Object> submitResponse = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                       Await.result(submitResponse, timeout);
+
+                       Future<Object> partitionUpdateResponse = 
taskManager.ask(
+                               new TaskMessages.UpdateTaskSinglePartitionInfo(
+                                       executionAttemptId,
+                                       new IntermediateDataSetID(),
+                                       new 
InputChannelDeploymentDescriptor(new ResultPartitionID(), 
ResultPartitionLocation.createLocal())),
+                               timeout);
+
+                       try {
+                               Await.result(partitionUpdateResponse, timeout);
+
+                               fail("The update task input partitions message 
should have failed.");
+                       } catch (Exception e) {
+                               // expected
+                       }
+               } finally {
+                       TestingUtils.stopActor(jobManager);
+                       TestingUtils.stopActor(taskManager);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public static class SimpleJobManager extends FlinkUntypedActor {

Reply via email to