twalthr commented on code in PR #20840:
URL: https://github.com/apache/flink/pull/20840#discussion_r974022475
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));
- if (!Arrays.equals(originalCheckpointConfigSerialized,
serializeConfig(checkpointCfg))) {
+ final Configuration enrichedClusterConfig = new
Configuration(originalClusterConfig);
+ enrichProgramConfigWildcards(enrichedClusterConfig);
+
+ // Check CheckpointConfig
+ final CheckpointConfig clusterCheckpointConfig = new
CheckpointConfig();
+ clusterCheckpointConfig.configure(enrichedClusterConfig);
+ if (!Arrays.equals(
+ serializeConfig(clusterCheckpointConfig),
serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}
- if (!Arrays.equals(originalExecutionConfigSerialized,
serializeConfig(config))) {
+ // Check ExecutionConfig
+ final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+ clusterExecutionConfig.configure(enrichedClusterConfig,
this.getUserClassloader());
+ if (!Arrays.equals(serializeConfig(clusterExecutionConfig),
serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}
+
return errors;
}
+ private static final class WildcardOption<T> {
+ private final ConfigOption<T> option;
+ private final Function<StreamContextEnvironment, T> getter;
+
+ WildcardOption(ConfigOption<T> option,
Function<StreamContextEnvironment, T> getter) {
+ this.option = option;
+ this.getter = getter;
+ }
+
+ void enrich(Configuration mutableConfig, StreamContextEnvironment
fromEnv) {
+ mutableConfig.set(option, getter.apply(fromEnv));
+ }
+
+ void remove(Configuration mutableConfig) {
+ mutableConfig.removeConfig(option);
+ }
+ }
+
+ private static final Map<String, WildcardOption<?>>
supportedProgramConfigWildcards =
+ new HashMap<>();
+
+ static {
+ supportedProgramConfigWildcards.put(
+ PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+ new WildcardOption<>(
+ PipelineOptions.GLOBAL_JOB_PARAMETERS,
+ env ->
env.getConfig().getGlobalJobParameters().toMap()));
+ }
+
+ private void enrichProgramConfigWildcards(Configuration mutableConfig) {
+ for (String key : programConfigWildcards) {
+ final WildcardOption<?> option =
supportedProgramConfigWildcards.get(key);
Review Comment:
Your comment makes absolutely sense. But the implementation is a result of
the terrible configuration story that we have to support right now. This is
also the reason why I added a comment to `ExecutionConfig` and `CheckpointCfg`.
If everything would be backed by `Configuration` instead of the POJOs that
partially materialize the config options, the implementation could be way
easier. Currently, we need to manually provide the backward path from POJO to
ConfigOption value manually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]