Repository: flink
Updated Branches:
  refs/heads/master f6709b4a4 -> ff3786663


[hotfix] [core] Improve FlinkFuture synchronous actions by avoiding creation of 
ExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff378666
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff378666
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff378666

Branch: refs/heads/master
Commit: ff3786663b7f1f8a09b5ad0666f55fb171d7f64c
Parents: 61d7f15
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:16:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/impl/FlinkFuture.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff378666/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index dd7e8de..ab23fc5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -24,6 +24,7 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
 import akka.japi.Procedure;
+
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -31,8 +32,10 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -59,6 +62,12 @@ public class FlinkFuture<T> implements Future<T> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFuture.class);
 
+       private static final Executor DIRECT_EXECUTOR = 
Executors.directExecutor();
+
+       private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = 
executionContextFromExecutor(DIRECT_EXECUTOR);
+
+       // 
------------------------------------------------------------------------
+
        protected scala.concurrent.Future<T> scalaFuture;
 
        FlinkFuture() {
@@ -346,6 +355,14 @@ public class FlinkFuture<T> implements Future<T> {
        
//-----------------------------------------------------------------------------------
 
        private static ExecutionContext createExecutionContext(final Executor 
executor) {
+               if (executor == DIRECT_EXECUTOR) {
+                       return DIRECT_EXECUTION_CONTEXT;
+               } else {
+                       return executionContextFromExecutor(executor);
+               }
+       }
+
+       private static ExecutionContext executionContextFromExecutor(final 
Executor executor) {
                return ExecutionContexts$.MODULE$.fromExecutor(executor, new 
Procedure<Throwable>() {
                        @Override
                        public void apply(Throwable throwable) throws Exception 
{

Reply via email to