This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 065018dde64 [FLINK-28213][runtime] StreamExecutionEnvironment
configure method support override pipeline.jars option
065018dde64 is described below
commit 065018dde64a10e8a4cf0ba296e2851a6867871c
Author: fengli <[email protected]>
AuthorDate: Thu Jun 23 14:40:04 2022 +0800
[FLINK-28213][runtime] StreamExecutionEnvironment configure method support
override pipeline.jars option
This closes #20057.
---
.../apache/flink/configuration/ConfigUtils.java | 28 ++++++++++++++++++++++
.../environment/StreamExecutionEnvironment.java | 15 ++++++++++++
...ecutionEnvironmentComplexConfigurationTest.java | 21 ++++++++++++++++
3 files changed, 64 insertions(+)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
index 1c275bc02e5..f53462dbe0b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
@@ -155,5 +155,33 @@ public class ConfigUtils {
return options;
}
+ /**
+ * Merge a {@link Collection} of values of type {@code T} and option
{@link ConfigOption} value
+ * of type {@link List} of type {@code T} from current {@link
Configuration}, then put it to
+ * {@link Configuration}.
+ *
+ * @param configuration the configuration object to get the value out and
write
+ * @param key the {@link ConfigOption option} to serve as the key for the
list in the
+ * configuration
+ * @param values the collection of values to merge as value for the {@code
key}
+ * @param decodeMapper the decode transformation function.
+ * @param encodeMapper the encode transformation function.
+ */
+ public static <T, E extends Throwable> void mergeCollectionsToConfig(
+ final Configuration configuration,
+ final ConfigOption<List<T>> key,
+ final Collection<T> values,
+ final FunctionWithException<T, T, E> decodeMapper,
+ final Function<T, T> encodeMapper)
+ throws E {
+ // decode option value from current configuration
+ Set<T> valueInConfig =
+ new HashSet<>(decodeListFromConfig(configuration, key,
decodeMapper));
+ // merge provided option value
+ valueInConfig.addAll(values);
+ // encode the merged value to current WritableConfig
+ encodeCollectionToConfig(configuration, key, valueInConfig,
encodeMapper);
+ }
+
private ConfigUtils() {}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 035bb33eff6..dc4dd7ce839 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
@@ -116,6 +117,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -1023,6 +1025,19 @@ public class StreamExecutionEnvironment {
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
flag));
+ // merge PipelineOptions.JARS, user maybe set this option in high
level such as table
+ // module, so here need to merge the jars from both configuration
object
+ configuration
+ .getOptional(PipelineOptions.JARS)
+ .ifPresent(
+ jars ->
+ ConfigUtils.mergeCollectionsToConfig(
+ this.configuration,
+ PipelineOptions.JARS,
+
Collections.unmodifiableCollection(jars),
+ String::toString,
+ String::toString));
+
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
index d0fd8f13e73..71a60e11d80 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
@@ -240,6 +240,27 @@ public class
StreamExecutionEnvironmentComplexConfigurationTest {
assertThat(env.getStateBackend(),
instanceOf(MemoryStateBackend.class));
}
+ @Test
+ public void testMergePipelineJarsWithConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(PipelineOptions.JARS,
Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar"));
+ StreamExecutionEnvironment envFromConfiguration =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+
+ // user configuration with different jars
+ Configuration userConfiguration = new Configuration();
+ userConfiguration.set(
+ PipelineOptions.JARS, Arrays.asList("/tmp/test2.jar",
"/tmp/test3.jar"));
+
+ // test pipeline.jars merge
+ envFromConfiguration.configure(
+ userConfiguration,
Thread.currentThread().getContextClassLoader());
+
+ assertEquals(
+
envFromConfiguration.getConfiguration().get(PipelineOptions.JARS),
+ Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar",
"/tmp/test3.jar"));
+ }
+
/** JobSubmitted counter listener for unit test. */
public static class BasicJobSubmittedCounter implements JobListener {
private int count = 0;