This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 13d35365f67 [FLINK-32592] Fix thread-safety of context (Stream)ExEnv
initializer
13d35365f67 is described below
commit 13d35365f677813d5f0090f121e14e8bdec646d1
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Jul 14 19:51:20 2023 +0200
[FLINK-32592] Fix thread-safety of context (Stream)ExEnv initializer
---
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../flink/api/java/ExecutionEnvironmentTest.java | 71 ++++++++++++++++++++++
.../environment/StreamExecutionEnvironment.java | 2 +-
.../StreamExecutionEnvironmentTest.java | 45 +++++++++++++-
4 files changed, 116 insertions(+), 4 deletions(-)
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a7b0cabedfc..9ed24215d35 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1422,7 +1422,7 @@ public class ExecutionEnvironment {
*/
protected static void
initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
- threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
+ threadLocalContextEnvironmentFactory.set(ctx);
}
/**
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java
new file mode 100644
index 00000000000..e659b31a949
--- /dev/null
+++
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ExecutionEnvironmentTest {
+
+ @Test
+ void testConcurrentSetContext() throws Exception {
+ int numThreads = 20;
+ final CountDownLatch waitingThreadCount = new
CountDownLatch(numThreads);
+ final OneShotLatch latch = new OneShotLatch();
+ final List<CheckedThread> threads = new ArrayList<>();
+ for (int x = 0; x < numThreads; x++) {
+ final CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ final ExecutionEnvironment preparedEnvironment =
+ new ExecutionEnvironment();
+ ExecutionEnvironment.initializeContextEnvironment(
+ () -> preparedEnvironment);
+ try {
+ waitingThreadCount.countDown();
+ latch.awaitQuietly();
+
assertThat(ExecutionEnvironment.getExecutionEnvironment())
+ .isSameAs(preparedEnvironment);
+ } finally {
+ ExecutionEnvironment.resetContextEnvironment();
+ }
+ }
+ };
+ thread.start();
+ threads.add(thread);
+ }
+
+ // wait for all threads to be ready and trigger the job submissions at
the same time
+ waitingThreadCount.await();
+ latch.trigger();
+
+ for (CheckedThread thread : threads) {
+ thread.sync();
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b9cf3eb150c..76be79688c0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2592,7 +2592,7 @@ public class StreamExecutionEnvironment implements
AutoCloseable {
protected static void
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
- threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
+ threadLocalContextEnvironmentFactory.set(ctx);
}
protected static void resetContextEnvironment() {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
similarity index 92%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
rename to
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index 87c944dd295..60dc8e6175b 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api;
+package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -28,11 +28,12 @@ import
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -47,10 +48,12 @@ import org.apache.flink.util.SplittableIterator;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -432,6 +435,44 @@ class StreamExecutionEnvironmentTest {
.isInstanceOf(IllegalArgumentException.class);
}
+ @Test
+ void testConcurrentSetContext() throws Exception {
+ int numThreads = 20;
+ final CountDownLatch waitingThreadCount = new
CountDownLatch(numThreads);
+ final OneShotLatch latch = new OneShotLatch();
+ final List<CheckedThread> threads = new ArrayList<>();
+ for (int x = 0; x < numThreads; x++) {
+ final CheckedThread thread =
+ new CheckedThread() {
+ @Override
+ public void go() {
+ final StreamExecutionEnvironment
preparedEnvironment =
+ new StreamExecutionEnvironment();
+
StreamExecutionEnvironment.initializeContextEnvironment(
+ configuration -> preparedEnvironment);
+ try {
+ waitingThreadCount.countDown();
+ latch.awaitQuietly();
+
assertThat(StreamExecutionEnvironment.getExecutionEnvironment())
+ .isSameAs(preparedEnvironment);
+ } finally {
+
StreamExecutionEnvironment.resetContextEnvironment();
+ }
+ }
+ };
+ thread.start();
+ threads.add(thread);
+ }
+
+ // wait for all threads to be ready and trigger the job submissions at
the same time
+ waitingThreadCount.await();
+ latch.trigger();
+
+ for (CheckedThread thread : threads) {
+ thread.sync();
+ }
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////