This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac84e0c84dee39da7339722aad5ed2dcf26e2d27
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Jun 5 15:03:01 2019 +0200

    [FLINK-12101] Deduplicate code by introducing 
ExecutionEnvironment#resolveFactory
    
    ExecutionEnvironment#resolveFactory selects between the thread local and 
the global factory.
    This method is used by the ExecutionEnvironment as well as the 
StreamExecutionEnvironment.
    
    This closes #8543.
---
 .../apache/flink/api/java/ExecutionEnvironment.java | 18 ++++++++----------
 .../main/java/org/apache/flink/api/java/Utils.java  | 21 +++++++++++++++++++++
 .../api/environment/StreamExecutionEnvironment.java | 20 ++++++++++----------
 3 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index f9fc1ef..be927c5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -102,10 +102,10 @@ public abstract class ExecutionEnvironment {
        protected static final Logger LOG = 
LoggerFactory.getLogger(ExecutionEnvironment.class);
 
        /** The environment of the context (local by default, cluster if 
invoked through command line). */
-       private static ExecutionEnvironmentFactory contextEnvironmentFactory;
+       private static ExecutionEnvironmentFactory contextEnvironmentFactory = 
null;
 
        /** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. 
*/
-       private static ThreadLocal<ExecutionEnvironmentFactory> 
contextEnvironmentFactoryThreadLocal = new ThreadLocal<>();
+       private static final ThreadLocal<ExecutionEnvironmentFactory> 
threadLocalContextEnvironmentFactory = new ThreadLocal<>();
 
        /** The default parallelism used by local environments. */
        private static int defaultLocalDop = 
Runtime.getRuntime().availableProcessors();
@@ -1064,11 +1064,9 @@ public abstract class ExecutionEnvironment {
         * @return The execution environment of the context in which the 
program is executed.
         */
        public static ExecutionEnvironment getExecutionEnvironment() {
-
-               return contextEnvironmentFactoryThreadLocal.get() == null ?
-                               (contextEnvironmentFactory == null ?
-                                       createLocalEnvironment() : 
contextEnvironmentFactory.createExecutionEnvironment()) :
-                               
contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
+               return 
Utils.resolveFactory(threadLocalContextEnvironmentFactory, 
contextEnvironmentFactory)
+                       
.map(ExecutionEnvironmentFactory::createExecutionEnvironment)
+                       
.orElseGet(ExecutionEnvironment::createLocalEnvironment);
        }
 
        /**
@@ -1259,7 +1257,7 @@ public abstract class ExecutionEnvironment {
         */
        protected static void 
initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
                contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
-               
contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory);
+               
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
        }
 
        /**
@@ -1269,7 +1267,7 @@ public abstract class ExecutionEnvironment {
         */
        protected static void resetContextEnvironment() {
                contextEnvironmentFactory = null;
-               contextEnvironmentFactoryThreadLocal.remove();
+               threadLocalContextEnvironmentFactory.remove();
        }
 
        /**
@@ -1281,6 +1279,6 @@ public abstract class ExecutionEnvironment {
         */
        @Internal
        public static boolean areExplicitEnvironmentsAllowed() {
-               return contextEnvironmentFactory == null && 
contextEnvironmentFactoryThreadLocal.get() == null;
+               return contextEnvironmentFactory == null && 
threadLocalContextEnvironmentFactory.get() == null;
        }
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index ed86f7d..c514b33 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -31,9 +31,12 @@ import org.apache.flink.configuration.Configuration;
 
 import org.apache.commons.lang3.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Optional;
 import java.util.Random;
 
 /**
@@ -296,6 +299,24 @@ public final class Utils {
                return ret;
        }
 
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Resolves the given factories. The thread local factory has 
preference over the static factory.
+        * If none is set, the method returns {@link Optional#empty()}.
+        *
+        * @param threadLocalFactory containing the thread local factory
+        * @param staticFactory containing the global factory
+        * @param <T> type of factory
+        * @return Optional containing the resolved factory if it exists, 
otherwise it's empty
+        */
+       public static <T> Optional<T> resolveFactory(ThreadLocal<T> 
threadLocalFactory, @Nullable T staticFactory) {
+               final T localFactory = threadLocalFactory.get();
+               final T factory = localFactory == null ? staticFactory : 
localFactory;
+
+               return Optional.ofNullable(factory);
+       }
+
        /**
         * Private constructor to prevent instantiation.
         */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f93fd4c..ffa2d47 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -115,10 +116,10 @@ public abstract class StreamExecutionEnvironment {
        /**
         * The environment of the context (local by default, cluster if invoked 
through command line).
         */
-       private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory;
+       private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory = null;
 
        /** The ThreadLocal used to store {@link 
StreamExecutionEnvironmentFactory}. */
-       private static ThreadLocal<StreamExecutionEnvironmentFactory> 
contextEnvironmentFactoryThreadLocal = new ThreadLocal<>();
+       private static final ThreadLocal<StreamExecutionEnvironmentFactory> 
threadLocalContextEnvironmentFactory = new ThreadLocal<>();
 
        /** The default parallelism used when creating a local environment. */
        private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
@@ -1571,13 +1572,12 @@ public abstract class StreamExecutionEnvironment {
         * executed.
         */
        public static StreamExecutionEnvironment getExecutionEnvironment() {
-               if (contextEnvironmentFactoryThreadLocal.get() != null) {
-                       return 
contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
-               }
-               if (contextEnvironmentFactory != null) {
-                       return 
contextEnvironmentFactory.createExecutionEnvironment();
-               }
+               return 
Utils.resolveFactory(threadLocalContextEnvironmentFactory, 
contextEnvironmentFactory)
+                       
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
+                       
.orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
+       }
 
+       private static StreamExecutionEnvironment 
createStreamExecutionEnvironment() {
                // because the streaming project depends on "flink-clients" 
(and not the other way around)
                // we currently need to intercept the data set environment and 
create a dependent stream env.
                // this should be fixed once we rework the project dependencies
@@ -1772,12 +1772,12 @@ public abstract class StreamExecutionEnvironment {
 
        protected static void 
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
                contextEnvironmentFactory = ctx;
-               
contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory);
+               
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
        }
 
        protected static void resetContextEnvironment() {
                contextEnvironmentFactory = null;
-               contextEnvironmentFactoryThreadLocal.remove();
+               threadLocalContextEnvironmentFactory.remove();
        }
 
        /**

Reply via email to