zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1009284356


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", 
"--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   needs a test for what happens if this isn't set in the config, since we have 
special logic for this case.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java:
##########
@@ -244,4 +245,15 @@ static List<String> tokenizeArguments(@Nullable final 
String args) {
         }
         return tokens;
     }
+
+    private static Integer getParallelism(JarRequestBody requestBody) {
+        Optional<Map<String, String>> optionalFlinkConfig =
+                Optional.ofNullable(requestBody.getFlinkConfiguration());
+        Optional<Integer> optionalParallelism = 
Optional.ofNullable(requestBody.getParallelism());
+        if (optionalParallelism.isPresent() || 
!optionalFlinkConfig.isPresent()) {
+            return requestBody.getParallelism();
+        }
+        return Integer.valueOf(
+                
optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()));
+    }

Review Comment:
   This additionally needs a guard for the cases that the optionaFlinkConfig 
does not contain `DEFAULT_PARALLELISM`.
   This is probably easier to do if you construct a `Configuration`.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java:
##########
@@ -114,4 +123,10 @@ public Integer getParallelism() {
     public JobID getJobId() {
         return jobId;
     }
+
+    @Nullable
+    @JsonIgnore
+    public Map<String, String> getFlinkConfiguration() {
+        return flinkConfiguration;
+    }

Review Comment:
   Consider returning a non-nullable `Configuration`. This should simplify 
other operations.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java:
##########
@@ -38,6 +39,8 @@ public class JarRunRequestBody extends JarRequestBody {
     private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
     private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = 
"restoreMode";
 
+    private static final String FIELD_NAME_FLINK_CONFIGURATION = 
"flinkConfiguration";

Review Comment:
   re-use name field from `JarRequestBody` instead



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,7 +92,13 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new 
Configuration(configuration);
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new 
Configuration(this.configuration);
+        if (requestJobFlinkConfig != null) {
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);
+        }

Review Comment:
   consider integrating this into `JarHandlerContext#applyToConfiguration`



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java:
##########
@@ -244,4 +245,15 @@ static List<String> tokenizeArguments(@Nullable final 
String args) {
         }
         return tokens;
     }
+
+    private static Integer getParallelism(JarRequestBody requestBody) {
+        Optional<Map<String, String>> optionalFlinkConfig =
+                Optional.ofNullable(requestBody.getFlinkConfiguration());
+        Optional<Integer> optionalParallelism = 
Optional.ofNullable(requestBody.getParallelism());
+        if (optionalParallelism.isPresent() || 
!optionalFlinkConfig.isPresent()) {
+            return requestBody.getParallelism();
+        }
+        return Integer.valueOf(
+                
optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()));
+    }

Review Comment:
   ```suggestion
           return optionalParallelism.orElse(optionalFlinkConfig
                           .map(map -> 
Integer.valueOf(optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()))
                           .orElse(null));
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =

Review Comment:
   ```suggestion
           final Map<String, String> requestJobFlinkConfig =
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new 
Configuration(this.configuration);

Review Comment:
   ```suggestion
           final Configuration effectiveConfiguration = new 
Configuration(this.configuration);
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new 
Configuration(this.configuration);
+        if (requestJobFlinkConfig != null) {
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);

Review Comment:
   ```suggestion        
               
effectiveConfiguration.addAll(Configuration.fromMap(requestJobFlinkConfig));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to