This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new daff5df  [FLINK-25906][streaming] check explicit env allowed on 
StreamExecutionEnvironment own.
daff5df is described below

commit daff5dfa8d9218b33f811611b3209508f2ae3bd7
Author: Jing Ge <[email protected]>
AuthorDate: Wed Feb 9 09:57:43 2022 +0100

    [FLINK-25906][streaming] check explicit env allowed on 
StreamExecutionEnvironment own.
---
 .../tests/test_stream_execution_environment_completeness.py |  2 +-
 .../streaming/api/environment/LocalStreamEnvironment.java   |  3 +--
 .../api/environment/StreamExecutionEnvironment.java         | 13 +++++++++++++
 .../api/scala/StreamingScalaAPICompletenessTest.scala       | 12 +++++++-----
 4 files changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index cca1a6c..5df28f6 100644
--- 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class 
StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'socketTextStream', 'initializeContextEnvironment', 
'readTextFile',
                 'setNumberOfExecutionRetries', 'executeAsync', 
'registerJobListener',
                 'clearJobListeners', 'getJobListeners', 'fromSequence', 
'getConfiguration',
-                'generateStreamGraph', 'getTransformations'}
+                'generateStreamGraph', 'getTransformations', 
'areExplicitEnvironmentsAllowed'}
 
 
 if __name__ == '__main__':
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 057a6ee..b2a5646 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -53,7 +52,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
     }
 
     private static Configuration validateAndGetConfiguration(final 
Configuration configuration) {
-        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+        if (!areExplicitEnvironmentsAllowed()) {
             throw new InvalidProgramException(
                     "The LocalStreamEnvironment cannot be used when submitting 
a program through a client, "
                             + "or running in a TestEnvironment context.");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ea52c5b..54dca34 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2480,6 +2480,19 @@ public class StreamExecutionEnvironment {
                         name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
     }
 
+    /**
+     * Checks whether it is currently permitted to explicitly instantiate a 
LocalEnvironment or a
+     * RemoteEnvironment.
+     *
+     * @return True, if it is possible to explicitly instantiate a 
LocalEnvironment or a
+     *     RemoteEnvironment, false otherwise.
+     */
+    @Internal
+    public static boolean areExplicitEnvironmentsAllowed() {
+        return contextEnvironmentFactory == null
+                && threadLocalContextEnvironmentFactory.get() == null;
+    }
+
     // Private helpers.
     @SuppressWarnings("unchecked")
     private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index ce56cf1..409fe9b 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -67,6 +67,8 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isForceCheckpointing",
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph",
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations",
+      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
+        ".areExplicitEnvironmentsAllowed",
 
 
       // TypeHints are only needed for Java API, Scala API doesn't need them
@@ -113,8 +115,8 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
 
     checkMethods(
       "ConnectedStreams", "ConnectedStreams",
-      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
-      classOf[ConnectedStreams[_,_]])
+      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_, 
_]],
+      classOf[ConnectedStreams[_, _]])
 
     checkMethods(
       "WindowedStream", "WindowedStream",
@@ -133,12 +135,12 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
 
     checkMethods(
       "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
-      
classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
-      classOf[JoinedStreams[_,_]#Where[_]#EqualTo#WithWindow[_]])
+      
classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_, 
_, _, _]],
+      classOf[JoinedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
 
     checkMethods(
       "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
-      
classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
+      
classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,
 _, _, _]],
       classOf[CoGroupedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
   }
 }

Reply via email to