kl0u commented on a change in pull request #12222:
URL: https://github.com/apache/flink/pull/12222#discussion_r426565650



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
##########
@@ -69,4 +78,21 @@ protected ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                        final ScheduledExecutor scheduledExecutor) {
                return new MemoryArchivedExecutionGraphStore();
        }
+
+       protected static List<URL> getClasspath(final Configuration 
configuration, final PackagedProgram program) {
+               final List<URL> classpath = new ArrayList<>();
+               classpath.addAll(
+                       ConfigUtils.decodeListFromConfig(
+                               configuration,
+                               PipelineOptions.CLASSPATHS,
+                               url -> {

Review comment:
       Instead of having this code in different places (see 
`ExecutionConfigAccessor`), wouldn't it make sense to change the 
`ConfigUtils.decodeListFromConfig()` to sth like:
   
   ```
   public static <IN, OUT, E extends Throwable> List<OUT> decodeListFromConfig(
                        final ReadableConfig configuration,
                        final ConfigOption<List<IN>> key,
                        final FunctionWithException<IN, OUT, E> mapper) throws 
E {
   
                checkNotNull(configuration);
                checkNotNull(key);
                checkNotNull(mapper);
   
                final List<IN> encodedString = configuration.get(key);
                if (encodedString == null) {
                        return Collections.emptyList();
                }
   
                final List<OUT> result = new ArrayList<>(encodedString.size());
                for (IN input : encodedString) {
                        result.add(mapper.apply(input));
                }
                return result;
        }
   ```
   
   And have this method become simply:
   
   ```
   protected static List<URL> getClasspath(final Configuration configuration, 
final PackagedProgram program) throws MalformedURLException {
                final List<URL> classpath = ConfigUtils.decodeListFromConfig(
                                configuration,
                                PipelineOptions.CLASSPATHS,
                                URL::new);
                classpath.addAll(program.getClasspaths());
                return 
Collections.unmodifiableList(classpath.stream().distinct().collect(Collectors.toList()));
        }
   ```
   
   Of course this will require some more changes in the 
`ExecutionConfigAccessor` and all the classes that are affected by the thrown 
exception, e.g. the executors who do not throw exception, they will now have to 
bubble up the exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to