yuxiqian commented on code in PR #3301:
URL: https://github.com/apache/flink-cdc/pull/3301#discussion_r1593492462
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java:
##########
@@ -22,29 +22,48 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Utilities for {@link
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */
@Internal
public class FlinkEnvironmentUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
/**
* Add the specified JAR to {@link StreamExecutionEnvironment} so that the
JAR will be uploaded
* together with the job graph.
*/
public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
+ addJar(env, Lists.newArrayList(jarUrl));
+ }
+
+ /**
+ * Add the specified JARs to {@link StreamExecutionEnvironment} so that
the JAR will be uploaded
+ * together with the job graph.
+ */
+ public static void addJar(StreamExecutionEnvironment env, List<URL>
jarUrls) {
Review Comment:
```suggestion
public static void addJar(StreamExecutionEnvironment env,
Collection<URL> jarUrls) {
```
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java:
##########
@@ -22,29 +22,48 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Utilities for {@link
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */
@Internal
public class FlinkEnvironmentUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
/**
* Add the specified JAR to {@link StreamExecutionEnvironment} so that the
JAR will be uploaded
* together with the job graph.
*/
public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
+ addJar(env, Lists.newArrayList(jarUrl));
+ }
+
+ /**
+ * Add the specified JARs to {@link StreamExecutionEnvironment} so that
the JAR will be uploaded
+ * together with the job graph.
+ */
+ public static void addJar(StreamExecutionEnvironment env, List<URL>
jarUrls) {
try {
Class<StreamExecutionEnvironment> envClass =
StreamExecutionEnvironment.class;
Field field = envClass.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = ((Configuration) field.get(env));
- List<String> jars =
- configuration.getOptional(PipelineOptions.JARS).orElse(new
ArrayList<>());
- jars.add(jarUrl.toString());
- configuration.set(PipelineOptions.JARS, jars);
+ Set<String> jars =
+ configuration.getOptional(PipelineOptions.JARS).orElse(new
ArrayList<>())
+ .stream()
+ .collect(Collectors.toSet());
+
jars.addAll(jarUrls.stream().map(URL::toString).collect(Collectors.toList()));
Review Comment:
Converting it to `Set` might lost orderliness of input. What about using
`stream().distinct()` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]