dmvk commented on code in PR #20840:
URL: https://github.com/apache/flink/pull/20840#discussion_r973938806


##########
flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java:
##########
@@ -105,16 +105,49 @@ public class DeploymentOptions {
                                             
TextElement.text(HighAvailabilityOptions.HA_MODE.key()))
                                     .build());
 
-    @Experimental
-    public static final ConfigOption<Boolean> ALLOW_CLIENT_JOB_CONFIGURATIONS =
-            ConfigOptions.key("execution.allow-client-job-configurations")
+    public static final ConfigOption<List<String>> PROGRAM_CONFIG_WILDCARDS =

Review Comment:
   can we again have these options marked as `@Experimental`?



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
                                         
ConfigurationNotAllowedMessage.ofConfigurationChange(
                                                 k, v)));
 
-        if (!Arrays.equals(originalCheckpointConfigSerialized, 
serializeConfig(checkpointCfg))) {
+        final Configuration enrichedClusterConfig = new 
Configuration(originalClusterConfig);
+        enrichProgramConfigWildcards(enrichedClusterConfig);
+
+        // Check CheckpointConfig
+        final CheckpointConfig clusterCheckpointConfig = new 
CheckpointConfig();
+        clusterCheckpointConfig.configure(enrichedClusterConfig);
+        if (!Arrays.equals(
+                serializeConfig(clusterCheckpointConfig), 
serializeConfig(checkpointCfg))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             checkpointCfg.getClass().getSimpleName()));
         }
 
-        if (!Arrays.equals(originalExecutionConfigSerialized, 
serializeConfig(config))) {
+        // Check ExecutionConfig
+        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+        clusterExecutionConfig.configure(enrichedClusterConfig, 
this.getUserClassloader());
+        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), 
serializeConfig(config))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             config.getClass().getSimpleName()));
         }
+
         return errors;
     }
 
+    private static final class WildcardOption<T> {
+        private final ConfigOption<T> option;
+        private final Function<StreamContextEnvironment, T> getter;
+
+        WildcardOption(ConfigOption<T> option, 
Function<StreamContextEnvironment, T> getter) {
+            this.option = option;
+            this.getter = getter;
+        }
+
+        void enrich(Configuration mutableConfig, StreamContextEnvironment 
fromEnv) {
+            mutableConfig.set(option, getter.apply(fromEnv));
+        }
+
+        void remove(Configuration mutableConfig) {
+            mutableConfig.removeConfig(option);
+        }
+    }
+
+    private static final Map<String, WildcardOption<?>> 
supportedProgramConfigWildcards =

Review Comment:
   - Static variable definitions + static blocks should be on top
   - Should the name be uppercase?



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
     @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfig,
+            final Configuration envConfig,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout,
-            final boolean allowConfigurations,
-            final Collection<String> errorMessages) {
-        super(executorServiceLoader, configuration, userCodeClassLoader);
+            final boolean programConfigEnabled,
+            final List<String> programConfigWildcards) {

Review Comment:
   nit: we should use the minimal interface necessary
   
   ```suggestion
               final Collection<String> programConfigWildcards) {
   ```



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -244,12 +240,21 @@ public static void unsetAsContext() {
     }
 
     private List<String> collectNotAllowedConfigurations() {
-        final List<String> errors = new ArrayList<>();
-        if (allowConfigurations) {
-            return errors;
+        if (programConfigEnabled) {
+            return Collections.emptyList();
         }
+
+        final List<String> errors = new ArrayList<>();
+
+        final Configuration clusterConfigMap = new 
Configuration(originalClusterConfig);
+        final Configuration envConfigMap = new Configuration(configuration);
+
+        removeProgramConfigWildcards(clusterConfigMap);
+        removeProgramConfigWildcards(envConfigMap);

Review Comment:
   the whole `removal` / `enrichment` seems overcomplicated;
   
   can we simply check for wildcard keys in the "diff functions" below?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -52,6 +52,15 @@
 @Public
 public class CheckpointConfig implements java.io.Serializable {
 
+    // NOTE TO IMPLEMENTERS:
+    // Please do not add further fields to this class. Use the ConfigOption 
stack instead!
+    // It is currently very tricky to keep this kind of POJO classes in sync 
with instances of
+    // org.apache.flink.configuration.Configuration. Instances of 
Configuration are way easier to
+    // pass, layer, merge, restrict, copy, filter, etc.
+    // See ExecutionOptions.RUNTIME_MODE for a reference implementation. If 
the option is very
+    // crucial for the API, we can add a dedicated setter to 
StreamExecutionEnvironment. Otherwise,
+    // introducing a ConfigOption should be enough.

Review Comment:
   Should we rather make an effort to deprecate these? (same applies to 
ExecutionConfig)



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
                                         
ConfigurationNotAllowedMessage.ofConfigurationChange(
                                                 k, v)));
 
-        if (!Arrays.equals(originalCheckpointConfigSerialized, 
serializeConfig(checkpointCfg))) {
+        final Configuration enrichedClusterConfig = new 
Configuration(originalClusterConfig);
+        enrichProgramConfigWildcards(enrichedClusterConfig);
+
+        // Check CheckpointConfig
+        final CheckpointConfig clusterCheckpointConfig = new 
CheckpointConfig();
+        clusterCheckpointConfig.configure(enrichedClusterConfig);
+        if (!Arrays.equals(
+                serializeConfig(clusterCheckpointConfig), 
serializeConfig(checkpointCfg))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             checkpointCfg.getClass().getSimpleName()));
         }
 
-        if (!Arrays.equals(originalExecutionConfigSerialized, 
serializeConfig(config))) {
+        // Check ExecutionConfig
+        final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+        clusterExecutionConfig.configure(enrichedClusterConfig, 
this.getUserClassloader());
+        if (!Arrays.equals(serializeConfig(clusterExecutionConfig), 
serializeConfig(config))) {
             errors.add(
                     ConfigurationNotAllowedMessage.ofConfigurationObject(
                             config.getClass().getSimpleName()));
         }
+
         return errors;
     }
 
+    private static final class WildcardOption<T> {
+        private final ConfigOption<T> option;
+        private final Function<StreamContextEnvironment, T> getter;
+
+        WildcardOption(ConfigOption<T> option, 
Function<StreamContextEnvironment, T> getter) {
+            this.option = option;
+            this.getter = getter;
+        }
+
+        void enrich(Configuration mutableConfig, StreamContextEnvironment 
fromEnv) {
+            mutableConfig.set(option, getter.apply(fromEnv));
+        }
+
+        void remove(Configuration mutableConfig) {
+            mutableConfig.removeConfig(option);
+        }
+    }
+
+    private static final Map<String, WildcardOption<?>> 
supportedProgramConfigWildcards =
+            new HashMap<>();
+
+    static {
+        supportedProgramConfigWildcards.put(
+                PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+                new WildcardOption<>(
+                        PipelineOptions.GLOBAL_JOB_PARAMETERS,
+                        env -> 
env.getConfig().getGlobalJobParameters().toMap()));
+    }
+
+    private void enrichProgramConfigWildcards(Configuration mutableConfig) {
+        for (String key : programConfigWildcards) {
+            final WildcardOption<?> option = 
supportedProgramConfigWildcards.get(key);

Review Comment:
   Why do we need to limit the "wildcard" configurations at all? This should be 
up to the cluster admin to decide.
   
   We should ensure to remove wildcards defined in the `clusterConfiguration` 
to prevent users from overriding it.



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
                                         
ConfigurationNotAllowedMessage.ofConfigurationChange(
                                                 k, v)));
 
-        if (!Arrays.equals(originalCheckpointConfigSerialized, 
serializeConfig(checkpointCfg))) {
+        final Configuration enrichedClusterConfig = new 
Configuration(originalClusterConfig);
+        enrichProgramConfigWildcards(enrichedClusterConfig);
+
+        // Check CheckpointConfig
+        final CheckpointConfig clusterCheckpointConfig = new 
CheckpointConfig();
+        clusterCheckpointConfig.configure(enrichedClusterConfig);

Review Comment:
   can we simply use the `configuration` here? (we've already checked whether 
it only contains allowed overrides)



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -206,35 +209,28 @@ private void validateAllowedExecution() {
 
     public static void setAsContext(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfig,

Review Comment:
   
   ```suggestion
               final Configuration clusterConfiguration,
   ```



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -68,25 +74,23 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
     private final boolean suppressSysout;
 
     private final boolean enforceSingleJobExecution;
-    private final byte[] originalCheckpointConfigSerialized;
-    private final byte[] originalExecutionConfigSerialized;
-    private final Configuration originalConfiguration;
+    private final Configuration originalClusterConfig;

Review Comment:
   do we need to have the `original` prefix here? "cluster" IMO already implies 
that this can't change anymore
   
   ```suggestion
       private final Configuration clusterConfiguration;
   ```



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -244,12 +240,21 @@ public static void unsetAsContext() {
     }
 
     private List<String> collectNotAllowedConfigurations() {

Review Comment:
   nit
   
   ```suggestion
       private Collection<String> collectNotAllowedConfigurations() {
   ```



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
     @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfig,
+            final Configuration envConfig,

Review Comment:
   the rename seems rather confusing since the parent class sets it into the 
`configuration` field anyway;
   
   ```suggestion
               final Configuration clusterConfiguration,
               final Configuration configuration,
   ```



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -206,35 +209,28 @@ private void validateAllowedExecution() {
 
     public static void setAsContext(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfig,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout) {
-        StreamExecutionEnvironmentFactory factory =
-                conf -> {
-                    final List<String> errors = new ArrayList<>();
-                    final boolean allowConfigurations =
-                            configuration.getBoolean(
-                                    
DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
-                    if (!allowConfigurations && !conf.toMap().isEmpty()) {
-                        conf.toMap()
-                                .forEach(
-                                        (k, v) ->
-                                                errors.add(
-                                                        
ConfigurationNotAllowedMessage
-                                                                
.ofConfigurationKeyAndValue(k, v)));
-                    }
-                    Configuration mergedConfiguration = new Configuration();
-                    mergedConfiguration.addAll(configuration);
-                    mergedConfiguration.addAll(conf);
+        final StreamExecutionEnvironmentFactory factory =
+                envInitConfig -> {
+                    final boolean programConfigEnabled =
+                            
clusterConfig.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED);
+                    final List<String> programConfigWildcards =
+                            
clusterConfig.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);

Review Comment:
   nit: maybe inline these?



##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
     @Internal
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
-            final Configuration configuration,
+            final Configuration clusterConfig,
+            final Configuration envConfig,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout,
-            final boolean allowConfigurations,
-            final Collection<String> errorMessages) {
-        super(executorServiceLoader, configuration, userCodeClassLoader);
+            final boolean programConfigEnabled,
+            final List<String> programConfigWildcards) {
+        super(executorServiceLoader, envConfig, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
-        this.allowConfigurations = allowConfigurations;
-        this.originalCheckpointConfigSerialized = 
serializeConfig(checkpointCfg);
-        this.originalExecutionConfigSerialized = serializeConfig(config);
-        this.originalConfiguration = new Configuration(configuration);
-        this.errorMessages = errorMessages;
+        this.originalClusterConfig = new Configuration(clusterConfig);

Review Comment:
   cloning seems unnecessary



##########
docs/layouts/shortcodes/generated/deployment_configuration.html:
##########
@@ -26,6 +20,18 @@
             <td>List&lt;String&gt;</td>
             <td>Custom JobListeners to be registered with the execution 
environment. The registered listeners cannot have constructors with 
arguments.</td>
         </tr>
+        <tr>
+            <td><h5>execution.program-config.enabled</h5></td>

Review Comment:
   I'm not really sure if `program` is the right name. There is currently no 
concept of "user program" in Flink (we're only talking about jobs and 
applications)
   
   Here are few suggestions:
   - `programmatic-configuration`
   - `per-environment-configuration`
   - `configuration-overrides`
   - `{user,client}-configuration`
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to