Repository: flink Updated Branches: refs/heads/master 59a5551ef -> 157424918
[FLINK-4933] [exec graph] Don't let the EG fail in case of a failing scheduleOrUpdateConsumers call Instead of failing the complete ExecutionGraph, a failing scheduleOrUpdateConsumers call will be reported back to the caller. The caller can then decide what to do. Per default, it will fail the calling task. This closes #2700 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15742491 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15742491 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15742491 Branch: refs/heads/master Commit: 1574249181304e124f8ac32b7d2a8fafdb0f9da9 Parents: 649f957 Author: Till Rohrmann <[email protected]> Authored: Thu Oct 27 11:41:29 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Oct 31 19:16:41 2016 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 16 +- .../executiongraph/ExecutionGraphException.java | 39 +++++ .../flink/runtime/jobmanager/JobManager.scala | 17 ++- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../ExecutionGraphSignalsTest.java | 27 ++++ .../runtime/taskmanager/TaskManagerTest.java | 147 ++++++++++++++++++- 6 files changed, 230 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 36dba63..e2701da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1135,17 +1135,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } - public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { + /** + * Schedule or updates consumers of the given result partition. + * + * @param partitionId specifying the result partition whose consumer shall be scheduled or updated + * @throws ExecutionGraphException if the schedule or update consumers operation could not be executed + */ + public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException { final Execution execution = currentExecutions.get(partitionId.getProducerId()); if (execution == null) { - fail(new IllegalStateException("Cannot find execution for execution ID " + - partitionId.getPartitionId())); + throw new ExecutionGraphException("Cannot find execution for execution Id " + + partitionId.getPartitionId() + '.'); } else if (execution.getVertex() == null){ - fail(new IllegalStateException("Execution with execution ID " + - partitionId.getPartitionId() + " has no vertex assigned.")); + throw new ExecutionGraphException("Execution with execution Id " + + partitionId.getPartitionId() + " has no vertex assigned."); } else { execution.getVertex().scheduleOrUpdateConsumers(partitionId); } http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java new file mode 100644 index 0000000..2de249b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +/** + * Base class for exceptions occurring in the {@link ExecutionGraph}. + */ +public class ExecutionGraphException extends Exception { + + private static final long serialVersionUID = -8253451032797220657L; + + public ExecutionGraphException(String message) { + super(message); + } + + public ExecutionGraphException(String message, Throwable cause) { + super(message, cause); + } + + public ExecutionGraphException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index bcfdd23..2bae7fe 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} +import java.net._ import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} @@ -48,7 +48,7 @@ import org.apache.flink.runtime.concurrent.BiFunction import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger} +import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} import org.apache.flink.runtime.io.network.PartitionState import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} @@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState -import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} +import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup @@ -922,8 +922,15 @@ class JobManager( case ScheduleOrUpdateConsumers(jobId, partitionId) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => - sender ! decorateMessage(Acknowledge) - executionGraph.scheduleOrUpdateConsumers(partitionId) + try { + executionGraph.scheduleOrUpdateConsumers(partitionId) + sender ! decorateMessage(Acknowledge) + } catch { + case e: Exception => + sender ! decorateMessage( + Failure(new Exception("Could not schedule or update consumers.", e)) + ) + } case None => log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " + s"consumers.") http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index ce763e7..74ed560 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1176,7 +1176,7 @@ class TaskManager( runningTasks.put(execId, prevTask) throw new IllegalStateException("TaskManager already contains a task for id " + execId) } - + // all good, we kick off the task, which performs its own initialization task.startTaskThread() http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 72784fb..de4a026 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -46,6 +48,7 @@ import org.powermock.api.mockito.PowerMockito; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.same; @@ -364,5 +367,29 @@ public class ExecutionGraphSignalsTest { eg.stop(); } + /** + * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt + * id, will not fail the execution graph. + */ + @Test + public void testFailingScheduleOrUpdateConsumers() throws IllegalAccessException { + IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID(); + // The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call + // should fail + ExecutionAttemptID producerId = new ExecutionAttemptID(); + ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId); + + f.set(eg, JobStatus.RUNNING); + assertEquals(JobStatus.RUNNING, eg.getState()); + + try { + eg.scheduleOrUpdateConsumers(resultPartitionId); + fail("Expected ExecutionGraphException."); + } catch (ExecutionGraphException e) { + // we've expected this exception to occur + } + + assertEquals(JobStatus.RUNNING, eg.getState()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index d4efd24..14ab569 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.Props; +import akka.actor.Status; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; @@ -31,6 +32,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -43,6 +46,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.PartitionState; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -67,6 +71,7 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.StoppableInvokable; +import org.apache.flink.types.IntValue; import org.apache.flink.util.NetUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -80,6 +85,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; @@ -123,7 +129,7 @@ public class TaskManagerTest extends TestLogger { } @Test - public void testSubmitAndExecuteTask() { + public void testSubmitAndExecuteTask() throws IOException { new JavaTestKit(system){{ ActorGateway taskManager = null; @@ -221,10 +227,6 @@ public class TaskManagerTest extends TestLogger { } }; } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { // shut down the actors TestingUtils.stopActor(taskManager); @@ -1390,6 +1392,75 @@ public class TaskManagerTest extends TestLogger { } }}; } + + /** + * Test that a failing schedule or update consumers call leads to the failing of the respective + * task. + * + * IMPORTANT: We have to make sure that the invokable's cancel method is called, because only + * then the future is completed. We do this by not eagerly deploy consumer tasks and requiring + * the invokable to fill one memory segment. The completed memory segment will trigger the + * scheduling of the downstream operator since it is in pipeline mode. After we've filled the + * memory segment, we'll block the invokable and wait for the task failure due to the failed + * schedule or update consumers call. + */ + @Test(timeout = 10000L) + public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { + new JavaTestKit(system) {{ + final Configuration configuration = new Configuration(); + + // set the memory segment to the smallest size possible, because we have to fill one + // memory buffer to trigger the schedule or update consumers message to the downstream + // operators + configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); + + final JobID jid = new JobID(); + final JobVertexID vid = new JobVertexID(); + final ExecutionAttemptID eid = new ExecutionAttemptID(); + final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); + + final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor( + new IntermediateDataSetID(), + new IntermediateResultPartitionID(), + ResultPartitionType.PIPELINED, + 1, + false // don't deploy eagerly but with the first completed memory buffer + ); + + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, + "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), + TestInvokableRecordCancel.class.getName(), + Collections.singletonList(resultPartitionDeploymentDescriptor), + Collections.<InputGateDeploymentDescriptor>emptyList(), + new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); + + + ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, leaderSessionID), "jobmanager"); + ActorGateway jobManager = new AkkaActorGateway(jmActorRef, leaderSessionID); + + final ActorGateway taskManager = TestingUtils.createTaskManager( + system, + jobManager, + configuration, + true, + true); + + try { + TestInvokableRecordCancel.resetGotCanceledFuture(); + + Future<Object> result = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(result, timeout); + + org.apache.flink.runtime.concurrent.Future<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled(); + + assertEquals(true, cancelFuture.get()); + } finally { + TestingUtils.stopActor(taskManager); + TestingUtils.stopActor(jobManager); + } + }}; + } // -------------------------------------------------------------------------------------------- @@ -1425,6 +1496,25 @@ public class TaskManagerTest extends TestLogger { } } + public static class FailingScheduleOrUpdateConsumersJobManager extends SimpleJobManager { + + public FailingScheduleOrUpdateConsumersJobManager(UUID leaderSessionId) { + super(leaderSessionId); + } + + @Override + public void handleMessage(Object message) throws Exception { + if (message instanceof ScheduleOrUpdateConsumers) { + getSender().tell( + decorateMessage( + new Status.Failure(new Exception("Could not schedule or update consumers."))), + getSelf()); + } else { + super.handleMessage(message); + } + } + } + public static class SimpleLookupJobManager extends SimpleJobManager { public SimpleLookupJobManager(UUID leaderSessionID) { @@ -1450,7 +1540,7 @@ public class TaskManagerTest extends TestLogger { public SimpleLookupFailingUpdateJobManager(UUID leaderSessionID, Set<ExecutionAttemptID> ids) { super(leaderSessionID); - this.validIDs = new HashSet<ExecutionAttemptID>(ids); + this.validIDs = new HashSet<>(ids); } @Override @@ -1566,7 +1656,7 @@ public class TaskManagerTest extends TestLogger { public void invoke() {} } - public static final class TestInvokableBlockingCancelable extends AbstractInvokable { + public static class TestInvokableBlockingCancelable extends AbstractInvokable { @Override public void invoke() throws Exception { @@ -1580,4 +1670,47 @@ public class TaskManagerTest extends TestLogger { } } } + + public static final class TestInvokableRecordCancel extends AbstractInvokable { + + private static final Object lock = new Object(); + private static CompletableFuture<Boolean> gotCanceledFuture = new FlinkCompletableFuture<>(); + + @Override + public void invoke() throws Exception { + final Object o = new Object(); + RecordWriter<IntValue> recordWriter = new RecordWriter<>(getEnvironment().getWriter(0)); + + for (int i = 0; i < 1024; i++) { + recordWriter.emit(new IntValue(42)); + } + + synchronized (o) { + //noinspection InfiniteLoopStatement + while (true) { + o.wait(); + } + } + + } + + @Override + public void cancel() { + synchronized (lock) { + gotCanceledFuture.complete(true); + } + } + + public static void resetGotCanceledFuture() { + synchronized (lock) { + gotCanceledFuture = new FlinkCompletableFuture<>(); + } + } + + public static org.apache.flink.runtime.concurrent.Future<Boolean> gotCanceled() { + synchronized (lock) { + return gotCanceledFuture; + } + } + } }
