This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 065018dde64 [FLINK-28213][runtime] StreamExecutionEnvironment 
configure method support override pipeline.jars option
065018dde64 is described below

commit 065018dde64a10e8a4cf0ba296e2851a6867871c
Author: fengli <[email protected]>
AuthorDate: Thu Jun 23 14:40:04 2022 +0800

    [FLINK-28213][runtime] StreamExecutionEnvironment configure method support 
override pipeline.jars option
    
    This closes #20057.
---
 .../apache/flink/configuration/ConfigUtils.java    | 28 ++++++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java    | 15 ++++++++++++
 ...ecutionEnvironmentComplexConfigurationTest.java | 21 ++++++++++++++++
 3 files changed, 64 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
index 1c275bc02e5..f53462dbe0b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
@@ -155,5 +155,33 @@ public class ConfigUtils {
         return options;
     }
 
+    /**
+     * Merge a {@link Collection} of values of type {@code T} and option 
{@link ConfigOption} value
+     * of type {@link List} of type {@code T} from current {@link 
Configuration}, then put it to
+     * {@link Configuration}.
+     *
+     * @param configuration the configuration object to get the value out and 
write
+     * @param key the {@link ConfigOption option} to serve as the key for the 
list in the
+     *     configuration
+     * @param values the collection of values to merge as value for the {@code 
key}
+     * @param decodeMapper the decode transformation function.
+     * @param encodeMapper the encode transformation function.
+     */
+    public static <T, E extends Throwable> void mergeCollectionsToConfig(
+            final Configuration configuration,
+            final ConfigOption<List<T>> key,
+            final Collection<T> values,
+            final FunctionWithException<T, T, E> decodeMapper,
+            final Function<T, T> encodeMapper)
+            throws E {
+        // decode option value from current configuration
+        Set<T> valueInConfig =
+                new HashSet<>(decodeListFromConfig(configuration, key, 
decodeMapper));
+        // merge provided option value
+        valueInConfig.addAll(values);
+        // encode the merged value to current WritableConfig
+        encodeCollectionToConfig(configuration, key, valueInConfig, 
encodeMapper);
+    }
+
     private ConfigUtils() {}
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 035bb33eff6..dc4dd7ce839 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -116,6 +117,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -1023,6 +1025,19 @@ public class StreamExecutionEnvironment {
                                                 
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
                                         flag));
 
+        // merge PipelineOptions.JARS, user maybe set this option in high 
level such as table
+        // module, so here need to merge the jars from both configuration 
object
+        configuration
+                .getOptional(PipelineOptions.JARS)
+                .ifPresent(
+                        jars ->
+                                ConfigUtils.mergeCollectionsToConfig(
+                                        this.configuration,
+                                        PipelineOptions.JARS,
+                                        
Collections.unmodifiableCollection(jars),
+                                        String::toString,
+                                        String::toString));
+
         config.configure(configuration, classLoader);
         checkpointCfg.configure(configuration);
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
index d0fd8f13e73..71a60e11d80 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
@@ -240,6 +240,27 @@ public class 
StreamExecutionEnvironmentComplexConfigurationTest {
         assertThat(env.getStateBackend(), 
instanceOf(MemoryStateBackend.class));
     }
 
+    @Test
+    public void testMergePipelineJarsWithConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(PipelineOptions.JARS, 
Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar"));
+        StreamExecutionEnvironment envFromConfiguration =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+
+        // user configuration with different jars
+        Configuration userConfiguration = new Configuration();
+        userConfiguration.set(
+                PipelineOptions.JARS, Arrays.asList("/tmp/test2.jar", 
"/tmp/test3.jar"));
+
+        // test pipeline.jars merge
+        envFromConfiguration.configure(
+                userConfiguration, 
Thread.currentThread().getContextClassLoader());
+
+        assertEquals(
+                
envFromConfiguration.getConfiguration().get(PipelineOptions.JARS),
+                Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar", 
"/tmp/test3.jar"));
+    }
+
     /** JobSubmitted counter listener for unit test. */
     public static class BasicJobSubmittedCounter implements JobListener {
         private int count = 0;

Reply via email to