yuxiqian commented on code in PR #3301:
URL: https://github.com/apache/flink-cdc/pull/3301#discussion_r1593492462


##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java:
##########
@@ -22,29 +22,48 @@
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Utilities for {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */
 @Internal
 public class FlinkEnvironmentUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
 
     /**
      * Add the specified JAR to {@link StreamExecutionEnvironment} so that the 
JAR will be uploaded
      * together with the job graph.
      */
     public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
+        addJar(env, Lists.newArrayList(jarUrl));
+    }
+
+    /**
+     * Add the specified JARs to {@link StreamExecutionEnvironment} so that 
the JAR will be uploaded
+     * together with the job graph.
+     */
+    public static void addJar(StreamExecutionEnvironment env, List<URL> 
jarUrls) {

Review Comment:
   ```suggestion
       public static void addJar(StreamExecutionEnvironment env, 
Collection<URL> jarUrls) {
   ```
   



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java:
##########
@@ -22,29 +22,48 @@
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Utilities for {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */
 @Internal
 public class FlinkEnvironmentUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
 
     /**
      * Add the specified JAR to {@link StreamExecutionEnvironment} so that the 
JAR will be uploaded
      * together with the job graph.
      */
     public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
+        addJar(env, Lists.newArrayList(jarUrl));
+    }
+
+    /**
+     * Add the specified JARs to {@link StreamExecutionEnvironment} so that 
the JAR will be uploaded
+     * together with the job graph.
+     */
+    public static void addJar(StreamExecutionEnvironment env, List<URL> 
jarUrls) {
         try {
             Class<StreamExecutionEnvironment> envClass = 
StreamExecutionEnvironment.class;
             Field field = envClass.getDeclaredField("configuration");
             field.setAccessible(true);
             Configuration configuration = ((Configuration) field.get(env));
-            List<String> jars =
-                    configuration.getOptional(PipelineOptions.JARS).orElse(new 
ArrayList<>());
-            jars.add(jarUrl.toString());
-            configuration.set(PipelineOptions.JARS, jars);
+            Set<String> jars =
+                    configuration.getOptional(PipelineOptions.JARS).orElse(new 
ArrayList<>())
+                            .stream()
+                            .collect(Collectors.toSet());
+            
jars.addAll(jarUrls.stream().map(URL::toString).collect(Collectors.toList()));

Review Comment:
   Converting it to `Set` might lost orderliness of input. What about using 
`stream().distinct()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to