[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService has been shut down
This PR suppresses occurring RejectedExecutionExceptions if an ExecutorService has been shut down. This only works for ExecutorServices at the moment. All other exceptions are logged. This closes #2757 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3908241 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3908241 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3908241 Branch: refs/heads/master Commit: d3908241323f5de3d454c7daf560faed49b3c779 Parents: c21ecae Author: Till Rohrmann <[email protected]> Authored: Fri Nov 4 11:16:47 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 15:26:42 2016 +0100 ---------------------------------------------------------------------- .../runtime/concurrent/impl/FlinkFuture.java | 27 ++++++++++++++++++-- .../flink/runtime/jobmanager/JobManager.scala | 5 ++-- 2 files changed, 27 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index b678c5e..9783d4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -23,6 +23,7 @@ import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.dispatch.Recover; +import akka.japi.Procedure; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.CompletableFuture; @@ -30,6 +31,8 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.Tuple2; import scala.concurrent.Await; @@ -43,6 +46,7 @@ import scala.util.Try; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -53,6 +57,8 @@ import java.util.concurrent.TimeoutException; */ public class FlinkFuture<T> implements Future<T> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class); + protected scala.concurrent.Future<T> scalaFuture; FlinkFuture() { @@ -335,8 +341,25 @@ public class FlinkFuture<T> implements Future<T> { // Helper functions and types //----------------------------------------------------------------------------------- - private static ExecutionContext createExecutionContext(Executor executor) { - return ExecutionContexts$.MODULE$.fromExecutor(executor); + private static ExecutionContext createExecutionContext(final Executor executor) { + return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() { + @Override + public void apply(Throwable throwable) throws Exception { + if (executor instanceof ExecutorService) { + ExecutorService executorService = (ExecutorService) executor; + // only log the exception if the executor service is still running + if (!executorService.isShutdown()) { + logThrowable(throwable); + } + } else { + logThrowable(throwable); + } + } + + private void logThrowable(Throwable throwable) { + LOG.warn("Uncaught exception in execution context.", throwable); + } + }); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d3908241/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3f0689f..68e71ef 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.net._ import java.util.UUID -import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} +import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException} import akka.actor.Status.{Failure, Success} import akka.actor._ @@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} -import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} @@ -87,7 +87,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent._ import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps /**
