Repository: flink Updated Branches: refs/heads/master f6709b4a4 -> ff3786663
[hotfix] [core] Improve FlinkFuture synchronous actions by avoiding creation of ExecutionContext Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff378666 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff378666 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff378666 Branch: refs/heads/master Commit: ff3786663b7f1f8a09b5ad0666f55fb171d7f64c Parents: 61d7f15 Author: Stephan Ewen <se...@apache.org> Authored: Fri Feb 10 13:16:13 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Feb 10 18:50:11 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/concurrent/impl/FlinkFuture.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff378666/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index dd7e8de..ab23fc5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -24,6 +24,7 @@ import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.dispatch.Recover; import akka.japi.Procedure; + import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.CompletableFuture; @@ -31,8 +32,10 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.Option; import scala.Tuple2; import scala.concurrent.Await; @@ -59,6 +62,12 @@ public class FlinkFuture<T> implements Future<T> { private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class); + private static final Executor DIRECT_EXECUTOR = Executors.directExecutor(); + + private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = executionContextFromExecutor(DIRECT_EXECUTOR); + + // ------------------------------------------------------------------------ + protected scala.concurrent.Future<T> scalaFuture; FlinkFuture() { @@ -346,6 +355,14 @@ public class FlinkFuture<T> implements Future<T> { //----------------------------------------------------------------------------------- private static ExecutionContext createExecutionContext(final Executor executor) { + if (executor == DIRECT_EXECUTOR) { + return DIRECT_EXECUTION_CONTEXT; + } else { + return executionContextFromExecutor(executor); + } + } + + private static ExecutionContext executionContextFromExecutor(final Executor executor) { return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() { @Override public void apply(Throwable throwable) throws Exception {