[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService has 
been shut down

This PR suppresses occurring RejectedExecutionExceptions if an ExecutorService 
has been shut
down. This only works for ExecutorServices at the moment. All other exceptions 
are logged.

This closes #2757


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3908241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3908241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3908241

Branch: refs/heads/master
Commit: d3908241323f5de3d454c7daf560faed49b3c779
Parents: c21ecae
Author: Till Rohrmann <[email protected]>
Authored: Fri Nov 4 11:16:47 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/impl/FlinkFuture.java    | 27 ++++++++++++++++++--
 .../flink/runtime/jobmanager/JobManager.scala   |  5 ++--
 2 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index b678c5e..9783d4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -23,6 +23,7 @@ import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
+import akka.japi.Procedure;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -43,6 +46,7 @@ import scala.util.Try;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -53,6 +57,8 @@ import java.util.concurrent.TimeoutException;
  */
 public class FlinkFuture<T> implements Future<T> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFuture.class);
+
        protected scala.concurrent.Future<T> scalaFuture;
 
        FlinkFuture() {
@@ -335,8 +341,25 @@ public class FlinkFuture<T> implements Future<T> {
        // Helper functions and types
        
//-----------------------------------------------------------------------------------
 
-       private static ExecutionContext createExecutionContext(Executor 
executor) {
-               return ExecutionContexts$.MODULE$.fromExecutor(executor);
+       private static ExecutionContext createExecutionContext(final Executor 
executor) {
+               return ExecutionContexts$.MODULE$.fromExecutor(executor, new 
Procedure<Throwable>() {
+                       @Override
+                       public void apply(Throwable throwable) throws Exception 
{
+                               if (executor instanceof ExecutorService) {
+                                       ExecutorService executorService = 
(ExecutorService) executor;
+                                       // only log the exception if the 
executor service is still running
+                                       if (!executorService.isShutdown()) {
+                                               logThrowable(throwable);
+                                       }
+                               } else {
+                                       logThrowable(throwable);
+                               }
+                       }
+
+                       private void logThrowable(Throwable throwable) {
+                               LOG.warn("Uncaught exception in execution 
context.", throwable);
+                       }
+               });
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3f0689f..68e71ef 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, 
TimeoutException}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
-import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
@@ -87,7 +87,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**

Reply via email to