Repository: flink Updated Branches: refs/heads/master 2ec2abfae -> 413609d13
[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance. This closes #3321. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/413609d1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/413609d1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/413609d1 Branch: refs/heads/master Commit: 413609d13fcf924fa8581450618bccc6abdbbda0 Parents: 2ec2abf Author: Till Rohrmann <[email protected]> Authored: Wed Feb 15 14:16:26 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Feb 16 17:09:59 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 20 +- .../runtime/taskmanager/TaskManagerTest.java | 237 ++++++++++++++++++- 2 files changed, 240 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7cb1902..a70454b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -69,13 +69,11 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration, TaskManagerConfiguration} import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.NetUtils import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -423,7 +421,7 @@ class TaskManager( futureResponse.mapTo[Boolean].onComplete { // IMPORTANT: In the future callback, we cannot directly modify state // but only send messages to the TaskManager to do those changes - case Success(result) => + case scala.util.Success(result) => if (!result) { self ! decorateMessage( FailTask( @@ -432,7 +430,7 @@ class TaskManager( ) } - case Failure(t) => + case scala.util.Failure(t) => self ! decorateMessage( FailTask( executionID, @@ -470,7 +468,7 @@ class TaskManager( sender ! decorateMessage(Acknowledge.get()) } catch { case t: Throwable => - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } else { log.debug(s"Cannot find task to stop for execution ${executionID})") @@ -762,8 +760,6 @@ class TaskManager( // ---- Done ---- log.debug(s"Done with stack trace sample $sampleId.") - - sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces) } @@ -781,7 +777,7 @@ class TaskManager( } } catch { case e: Exception => - sender ! Failure(e) + sender ! decorateMessage(Status.Failure(e)) } case _ => unhandled(message) @@ -841,10 +837,10 @@ class TaskManager( client.put(fis); }(context.dispatcher) .onComplete { - case Success(value) => + case scala.util.Success(value) => sender ! value fis.close() - case Failure(e) => + case scala.util.Failure(e) => sender ! akka.actor.Status.Failure(e) fis.close() }(context.dispatcher) @@ -1209,7 +1205,7 @@ class TaskManager( catch { case t: Throwable => log.error("SubmitTask failed", t) - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } @@ -1263,7 +1259,7 @@ class TaskManager( if (errors.isEmpty) { sender ! decorateMessage(Acknowledge.get()) } else { - sender ! decorateMessage(Failure(new Exception(errors.mkString("\n")))) + sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n")))) } case None => http://git-wip-us.apache.org/repos/asf/flink/blob/413609d1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 770aa35..356d693 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -483,7 +483,7 @@ public class TaskManagerTest extends TestLogger { expectMsgEquals(Acknowledge.get()); tm.tell(new StopTask(eid2), testActorGateway); - expectMsgClass(Failure.class); + expectMsgClass(Status.Failure.class); assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); @@ -1227,13 +1227,13 @@ public class TaskManagerTest extends TestLogger { // Receive the expected message (heartbeat races possible) Object[] msg = receiveN(1); - while (!(msg[0] instanceof Failure)) { + while (!(msg[0] instanceof Status.Failure)) { msg = receiveN(1); } - Failure response = (Failure) msg[0]; + Status.Failure response = (Status.Failure) msg[0]; - assertEquals(IllegalStateException.class, response.exception().getClass()); + assertEquals(IllegalStateException.class, response.cause().getClass()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1525,7 +1525,234 @@ public class TaskManagerTest extends TestLogger { } }}; } - + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the submit task + * message fails. + */ + @Test + public void testSubmitTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task", + 0, // this will make the submission fail because the number of key groups must be >= 1 + 0, + 1, + 0, + new Configuration(), + new Configuration(), + "Foobar", + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); + + Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + try { + Await.result(submitResponse, timeout); + + fail("The submit task message should have failed."); + } catch (IllegalArgumentException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the stop task + * message fails. + */ + @Test + public void testStopTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + BlockingNoOpInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); + + Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future<Object> stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout); + + try { + Await.result(stopResponse, timeout); + + fail("The stop task message should have failed."); + } catch (UnsupportedOperationException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testStackTraceSampleFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + Future<Object> stackTraceResponse = taskManager.ask( + new TriggerStackTraceSample( + 0, + new ExecutionAttemptID(), + 0, + Time.milliseconds(1L), + 0), + timeout); + + try { + Await.result(stackTraceResponse, timeout); + + fail("The trigger stack trace message should have failed."); + } catch (IllegalStateException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testUpdateTaskInputPartitionsFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + BlockingNoOpInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); + + Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future<Object> partitionUpdateResponse = taskManager.ask( + new TaskMessages.UpdateTaskSinglePartitionInfo( + executionAttemptId, + new IntermediateDataSetID(), + new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())), + timeout); + + try { + Await.result(partitionUpdateResponse, timeout); + + fail("The update task input partitions message should have failed."); + } catch (Exception e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends FlinkUntypedActor {
