twalthr commented on code in PR #20840:
URL: https://github.com/apache/flink/pull/20840#discussion_r974022475


##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
                                         
ConfigurationNotAllowedMessage.ofConfigurationChange(
                                                 k, v)));
 
-        if (!Arrays.equals(originalCheckpointConfigSerialized, 
serializeConfig(checkpointCfg))) {
+        final Configuration enrichedClusterConfig = new 
Configuration(originalClusterConfig);
+        enrichProgramConfigWildcards(enrichedClusterConfig);
+
+        // Check CheckpointConfig
+        final CheckpointConfig clusterCheckpointConfig = new 
CheckpointConfig();
+        clusterCheckpointConfig.configure(enrichedClusterConfig);
+        if (!Arrays.equals(
+                serializeConfig(clusterCheckpointConfig), 
serializeConfig(checkpointCfg))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             checkpointCfg.getClass().getSimpleName()));
         }
 
-        if (!Arrays.equals(originalExecutionConfigSerialized, 
serializeConfig(config))) {
+        // Check ExecutionConfig
+        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+        clusterExecutionConfig.configure(enrichedClusterConfig, 
this.getUserClassloader());
+        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), 
serializeConfig(config))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             config.getClass().getSimpleName()));
         }
+
         return errors;
     }
 
+    private static final class WildcardOption<T> {
+        private final ConfigOption<T> option;
+        private final Function<StreamContextEnvironment, T> getter;
+
+        WildcardOption(ConfigOption<T> option, 
Function<StreamContextEnvironment, T> getter) {
+            this.option = option;
+            this.getter = getter;
+        }
+
+        void enrich(Configuration mutableConfig, StreamContextEnvironment 
fromEnv) {
+            mutableConfig.set(option, getter.apply(fromEnv));
+        }
+
+        void remove(Configuration mutableConfig) {
+            mutableConfig.removeConfig(option);
+        }
+    }
+
+    private static final Map<String, WildcardOption<?>> 
supportedProgramConfigWildcards =
+            new HashMap<>();
+
+    static {
+        supportedProgramConfigWildcards.put(
+                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+                new WildcardOption<>(
+                        PipelineOptions.GLOBAL_JOB_PARAMETERS,
+                        env -> 
env.getConfig().getGlobalJobParameters().toMap()));
+    }
+
+    private void enrichProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            final WildcardOption<?> option = 
supportedProgramConfigWildcards.get(key);

Review Comment:
   Your comment makes absolutely sense. But the implementation is a result of 
the terrible configuration story that we have to support right now. This is 
also the reason why I added a comment to `ExecutionConfig` and `CheckpointCfg`. 
If everything would be backed by `Configuration` instead of the POJOs that 
partially materialize the config options, the implementation could be way 
easier. Currently, we need to manually provide the backward path from POJO to 
ConfigOption value manually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to