Repository: flink
Updated Branches:
  refs/heads/master 59a5551ef -> 157424918


[FLINK-4933] [exec graph] Don't let the EG fail in case of a failing 
scheduleOrUpdateConsumers call

Instead of failing the complete ExecutionGraph, a failing 
scheduleOrUpdateConsumers call
will be reported back to the caller. The caller can then decide what to do. Per 
default,
it will fail the calling task.

This closes #2700


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15742491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15742491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15742491

Branch: refs/heads/master
Commit: 1574249181304e124f8ac32b7d2a8fafdb0f9da9
Parents: 649f957
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 27 11:41:29 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Oct 31 19:16:41 2016 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  16 +-
 .../executiongraph/ExecutionGraphException.java |  39 +++++
 .../flink/runtime/jobmanager/JobManager.scala   |  17 ++-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../ExecutionGraphSignalsTest.java              |  27 ++++
 .../runtime/taskmanager/TaskManagerTest.java    | 147 ++++++++++++++++++-
 6 files changed, 230 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 36dba63..e2701da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1135,17 +1135,23 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+       /**
+        * Schedule or updates consumers of the given result partition.
+        *
+        * @param partitionId specifying the result partition whose consumer 
shall be scheduled or updated
+        * @throws ExecutionGraphException if the schedule or update consumers 
operation could not be executed
+        */
+       public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) 
throws ExecutionGraphException {
 
                final Execution execution = 
currentExecutions.get(partitionId.getProducerId());
 
                if (execution == null) {
-                       fail(new IllegalStateException("Cannot find execution 
for execution ID " +
-                                       partitionId.getPartitionId()));
+                       throw new ExecutionGraphException("Cannot find 
execution for execution Id " +
+                               partitionId.getPartitionId() + '.');
                }
                else if (execution.getVertex() == null){
-                       fail(new IllegalStateException("Execution with 
execution ID " +
-                                       partitionId.getPartitionId() + " has no 
vertex assigned."));
+                       throw new ExecutionGraphException("Execution with 
execution Id " +
+                               partitionId.getPartitionId() + " has no vertex 
assigned.");
                } else {
                        
execution.getVertex().scheduleOrUpdateConsumers(partitionId);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
new file mode 100644
index 0000000..2de249b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * Base class for exceptions occurring in the {@link ExecutionGraph}.
+ */
+public class ExecutionGraphException extends Exception {
+
+       private static final long serialVersionUID = -8253451032797220657L;
+
+       public ExecutionGraphException(String message) {
+               super(message);
+       }
+
+       public ExecutionGraphException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ExecutionGraphException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bcfdd23..2bae7fe 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
+import java.net._
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.concurrent.BiFunction
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger}
+import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, 
InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
@@ -62,7 +62,7 @@ import 
org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, 
AccumulatorResultStringsFound, AccumulatorResultsErroneous, 
AccumulatorResultsFound, RequestAccumulatorResults, 
RequestAccumulatorResultsStringified}
+import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
@@ -922,8 +922,15 @@ class JobManager(
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          sender ! decorateMessage(Acknowledge)
-          executionGraph.scheduleOrUpdateConsumers(partitionId)
+          try {
+            executionGraph.scheduleOrUpdateConsumers(partitionId)
+            sender ! decorateMessage(Acknowledge)
+          } catch {
+            case e: Exception =>
+              sender ! decorateMessage(
+                Failure(new Exception("Could not schedule or update 
consumers.", e))
+              )
+          }
         case None =>
           log.error(s"Cannot find execution graph for job ID $jobId to 
schedule or update " +
             s"consumers.")

http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ce763e7..74ed560 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1176,7 +1176,7 @@ class TaskManager(
         runningTasks.put(execId, prevTask)
         throw new IllegalStateException("TaskManager already contains a task 
for id " + execId)
       }
-
+      
       // all good, we kick off the task, which performs its own initialization
       task.startTaskThread()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 72784fb..de4a026 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -46,6 +48,7 @@ import org.powermock.api.mockito.PowerMockito;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.same;
@@ -364,5 +367,29 @@ public class ExecutionGraphSignalsTest {
                eg.stop();
        }
 
+       /**
+        * Tests that a failing scheduleOrUpdateConsumers call with a 
non-existing execution attempt
+        * id, will not fail the execution graph.
+        */
+       @Test
+       public void testFailingScheduleOrUpdateConsumers() throws 
IllegalAccessException {
+               IntermediateResultPartitionID intermediateResultPartitionId = 
new IntermediateResultPartitionID();
+               // The execution attempt id does not exist and thus the 
scheduleOrUpdateConsumers call
+               // should fail
+               ExecutionAttemptID producerId = new ExecutionAttemptID();
+               ResultPartitionID resultPartitionId = new 
ResultPartitionID(intermediateResultPartitionId, producerId);
+
+               f.set(eg, JobStatus.RUNNING);
 
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               try {
+                       eg.scheduleOrUpdateConsumers(resultPartitionId);
+                       fail("Expected ExecutionGraphException.");
+               } catch (ExecutionGraphException e) {
+                       // we've expected this exception to occur
+               }
+
+               assertEquals(JobStatus.RUNNING, eg.getState());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/15742491/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d4efd24..14ab569 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -31,6 +32,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -43,6 +46,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -67,6 +71,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -80,6 +85,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
@@ -123,7 +129,7 @@ public class TaskManagerTest extends TestLogger {
        }
 
        @Test
-       public void testSubmitAndExecuteTask() {
+       public void testSubmitAndExecuteTask() throws IOException {
                new JavaTestKit(system){{
 
                        ActorGateway taskManager = null;
@@ -221,10 +227,6 @@ public class TaskManagerTest extends TestLogger {
                                        }
                                };
                        }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
                        finally {
                                // shut down the actors
                                TestingUtils.stopActor(taskManager);
@@ -1390,6 +1392,75 @@ public class TaskManagerTest extends TestLogger {
                        }
                }};
        }
+
+       /**
+        * Test that a failing schedule or update consumers call leads to the 
failing of the respective
+        * task.
+        *
+        * IMPORTANT: We have to make sure that the invokable's cancel method 
is called, because only
+        * then the future is completed. We do this by not eagerly deploy 
consumer tasks and requiring
+        * the invokable to fill one memory segment. The completed memory 
segment will trigger the
+        * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
+        * memory segment, we'll block the invokable and wait for the task 
failure due to the failed
+        * schedule or update consumers call.
+        */
+       @Test(timeout = 10000L)
+       public void testFailingScheduleOrUpdateConsumersMessage() throws 
Exception {
+               new JavaTestKit(system) {{
+                       final Configuration configuration = new Configuration();
+
+                       // set the memory segment to the smallest size 
possible, because we have to fill one
+                       // memory buffer to trigger the schedule or update 
consumers message to the downstream
+                       // operators
+                       
configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 
4096);
+
+                       final JobID jid = new JobID();
+                       final JobVertexID vid = new JobVertexID();
+                       final ExecutionAttemptID eid = new ExecutionAttemptID();
+                       final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
+
+                       final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
+                               new IntermediateDataSetID(),
+                               new IntermediateResultPartitionID(),
+                               ResultPartitionType.PIPELINED,
+                               1,
+                               false // don't deploy eagerly but with the 
first completed memory buffer
+                       );
+
+                       final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
+                               "TestTask", 1, 0, 1, 0, new Configuration(), 
new Configuration(),
+                               TestInvokableRecordCancel.class.getName(),
+                               
Collections.singletonList(resultPartitionDeploymentDescriptor),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                               new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
+
+
+                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
leaderSessionID), "jobmanager");
+                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, leaderSessionID);
+
+                       final ActorGateway taskManager = 
TestingUtils.createTaskManager(
+                               system,
+                               jobManager,
+                               configuration,
+                               true,
+                               true);
+
+                       try {
+                               
TestInvokableRecordCancel.resetGotCanceledFuture();
+
+                               Future<Object> result = taskManager.ask(new 
SubmitTask(tdd), timeout);
+
+                               Await.result(result, timeout);
+
+                               
org.apache.flink.runtime.concurrent.Future<Boolean> cancelFuture = 
TestInvokableRecordCancel.gotCanceled();
+
+                               assertEquals(true, cancelFuture.get());
+                       } finally {
+                               TestingUtils.stopActor(taskManager);
+                               TestingUtils.stopActor(jobManager);
+                       }
+               }};
+       }
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -1425,6 +1496,25 @@ public class TaskManagerTest extends TestLogger {
                }
        }
 
+       public static class FailingScheduleOrUpdateConsumersJobManager extends 
SimpleJobManager {
+
+               public FailingScheduleOrUpdateConsumersJobManager(UUID 
leaderSessionId) {
+                       super(leaderSessionId);
+               }
+
+               @Override
+               public void handleMessage(Object message) throws Exception {
+                       if (message instanceof ScheduleOrUpdateConsumers) {
+                               getSender().tell(
+                                       decorateMessage(
+                                               new Status.Failure(new 
Exception("Could not schedule or update consumers."))),
+                                       getSelf());
+                       } else {
+                               super.handleMessage(message);
+                       }
+               }
+       }
+
        public static class SimpleLookupJobManager extends SimpleJobManager {
 
                public SimpleLookupJobManager(UUID leaderSessionID) {
@@ -1450,7 +1540,7 @@ public class TaskManagerTest extends TestLogger {
 
                public SimpleLookupFailingUpdateJobManager(UUID 
leaderSessionID, Set<ExecutionAttemptID> ids) {
                        super(leaderSessionID);
-                       this.validIDs = new HashSet<ExecutionAttemptID>(ids);
+                       this.validIDs = new HashSet<>(ids);
                }
 
                @Override
@@ -1566,7 +1656,7 @@ public class TaskManagerTest extends TestLogger {
                public void invoke() {}
        }
        
-       public static final class TestInvokableBlockingCancelable extends 
AbstractInvokable {
+       public static class TestInvokableBlockingCancelable extends 
AbstractInvokable {
 
                @Override
                public void invoke() throws Exception {
@@ -1580,4 +1670,47 @@ public class TaskManagerTest extends TestLogger {
                        }
                }
        }
+
+       public static final class TestInvokableRecordCancel extends 
AbstractInvokable {
+
+               private static final Object lock = new Object();
+               private static CompletableFuture<Boolean> gotCanceledFuture = 
new FlinkCompletableFuture<>();
+
+               @Override
+               public void invoke() throws Exception {
+                       final Object o = new Object();
+                       RecordWriter<IntValue> recordWriter = new 
RecordWriter<>(getEnvironment().getWriter(0));
+
+                       for (int i = 0; i < 1024; i++) {
+                               recordWriter.emit(new IntValue(42));
+                       }
+
+                       synchronized (o) {
+                               //noinspection InfiniteLoopStatement
+                               while (true) {
+                                       o.wait();
+                               }
+                       }
+
+               }
+
+               @Override
+               public void cancel() {
+                       synchronized (lock) {
+                               gotCanceledFuture.complete(true);
+                       }
+               }
+
+               public static void resetGotCanceledFuture() {
+                       synchronized (lock) {
+                               gotCanceledFuture = new 
FlinkCompletableFuture<>();
+                       }
+               }
+
+               public static 
org.apache.flink.runtime.concurrent.Future<Boolean> gotCanceled() {
+                       synchronized (lock) {
+                               return gotCanceledFuture;
+                       }
+               }
+       }
 }

Reply via email to