dmvk commented on code in PR #20840:
URL: https://github.com/apache/flink/pull/20840#discussion_r973938806
##########
flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java:
##########
@@ -105,16 +105,49 @@ public class DeploymentOptions {
TextElement.text(HighAvailabilityOptions.HA_MODE.key()))
.build());
- @Experimental
- public static final ConfigOption<Boolean> ALLOW_CLIENT_JOB_CONFIGURATIONS =
- ConfigOptions.key("execution.allow-client-job-configurations")
+ public static final ConfigOption<List<String>> PROGRAM_CONFIG_WILDCARDS =
Review Comment:
can we again have these options marked as `@Experimental`?
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));
- if (!Arrays.equals(originalCheckpointConfigSerialized,
serializeConfig(checkpointCfg))) {
+ final Configuration enrichedClusterConfig = new
Configuration(originalClusterConfig);
+ enrichProgramConfigWildcards(enrichedClusterConfig);
+
+ // Check CheckpointConfig
+ final CheckpointConfig clusterCheckpointConfig = new
CheckpointConfig();
+ clusterCheckpointConfig.configure(enrichedClusterConfig);
+ if (!Arrays.equals(
+ serializeConfig(clusterCheckpointConfig),
serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}
- if (!Arrays.equals(originalExecutionConfigSerialized,
serializeConfig(config))) {
+ // Check ExecutionConfig
+ final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+ clusterExecutionConfig.configure(enrichedClusterConfig,
this.getUserClassloader());
+ if (!Arrays.equals(serializeConfig(clusterExecutionConfig),
serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}
+
return errors;
}
+ private static final class WildcardOption<T> {
+ private final ConfigOption<T> option;
+ private final Function<StreamContextEnvironment, T> getter;
+
+ WildcardOption(ConfigOption<T> option,
Function<StreamContextEnvironment, T> getter) {
+ this.option = option;
+ this.getter = getter;
+ }
+
+ void enrich(Configuration mutableConfig, StreamContextEnvironment
fromEnv) {
+ mutableConfig.set(option, getter.apply(fromEnv));
+ }
+
+ void remove(Configuration mutableConfig) {
+ mutableConfig.removeConfig(option);
+ }
+ }
+
+ private static final Map<String, WildcardOption<?>>
supportedProgramConfigWildcards =
Review Comment:
- Static variable definitions + static blocks should be on top
- Should the name be uppercase?
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
@Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
+ final Configuration clusterConfig,
+ final Configuration envConfig,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout,
- final boolean allowConfigurations,
- final Collection<String> errorMessages) {
- super(executorServiceLoader, configuration, userCodeClassLoader);
+ final boolean programConfigEnabled,
+ final List<String> programConfigWildcards) {
Review Comment:
nit: we should use the minimal interface necessary
```suggestion
final Collection<String> programConfigWildcards) {
```
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -244,12 +240,21 @@ public static void unsetAsContext() {
}
private List<String> collectNotAllowedConfigurations() {
- final List<String> errors = new ArrayList<>();
- if (allowConfigurations) {
- return errors;
+ if (programConfigEnabled) {
+ return Collections.emptyList();
}
+
+ final List<String> errors = new ArrayList<>();
+
+ final Configuration clusterConfigMap = new
Configuration(originalClusterConfig);
+ final Configuration envConfigMap = new Configuration(configuration);
+
+ removeProgramConfigWildcards(clusterConfigMap);
+ removeProgramConfigWildcards(envConfigMap);
Review Comment:
the whole `removal` / `enrichment` seems overcomplicated;
can we simply check for wildcard keys in the "diff functions" below?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -52,6 +52,15 @@
@Public
public class CheckpointConfig implements java.io.Serializable {
+ // NOTE TO IMPLEMENTERS:
+ // Please do not add further fields to this class. Use the ConfigOption
stack instead!
+ // It is currently very tricky to keep this kind of POJO classes in sync
with instances of
+ // org.apache.flink.configuration.Configuration. Instances of
Configuration are way easier to
+ // pass, layer, merge, restrict, copy, filter, etc.
+ // See ExecutionOptions.RUNTIME_MODE for a reference implementation. If
the option is very
+ // crucial for the API, we can add a dedicated setter to
StreamExecutionEnvironment. Otherwise,
+ // introducing a ConfigOption should be enough.
Review Comment:
Should we rather make an effort to deprecate these? (same applies to
ExecutionConfig)
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));
- if (!Arrays.equals(originalCheckpointConfigSerialized,
serializeConfig(checkpointCfg))) {
+ final Configuration enrichedClusterConfig = new
Configuration(originalClusterConfig);
+ enrichProgramConfigWildcards(enrichedClusterConfig);
+
+ // Check CheckpointConfig
+ final CheckpointConfig clusterCheckpointConfig = new
CheckpointConfig();
+ clusterCheckpointConfig.configure(enrichedClusterConfig);
+ if (!Arrays.equals(
+ serializeConfig(clusterCheckpointConfig),
serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}
- if (!Arrays.equals(originalExecutionConfigSerialized,
serializeConfig(config))) {
+ // Check ExecutionConfig
+ final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
+ clusterExecutionConfig.configure(enrichedClusterConfig,
this.getUserClassloader());
+ if (!Arrays.equals(serializeConfig(clusterExecutionConfig),
serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}
+
return errors;
}
+ private static final class WildcardOption<T> {
+ private final ConfigOption<T> option;
+ private final Function<StreamContextEnvironment, T> getter;
+
+ WildcardOption(ConfigOption<T> option,
Function<StreamContextEnvironment, T> getter) {
+ this.option = option;
+ this.getter = getter;
+ }
+
+ void enrich(Configuration mutableConfig, StreamContextEnvironment
fromEnv) {
+ mutableConfig.set(option, getter.apply(fromEnv));
+ }
+
+ void remove(Configuration mutableConfig) {
+ mutableConfig.removeConfig(option);
+ }
+ }
+
+ private static final Map<String, WildcardOption<?>>
supportedProgramConfigWildcards =
+ new HashMap<>();
+
+ static {
+ supportedProgramConfigWildcards.put(
+ PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
+ new WildcardOption<>(
+ PipelineOptions.GLOBAL_JOB_PARAMETERS,
+ env ->
env.getConfig().getGlobalJobParameters().toMap()));
+ }
+
+ private void enrichProgramConfigWildcards(Configuration mutableConfig) {
+ for (String key : programConfigWildcards) {
+ final WildcardOption<?> option =
supportedProgramConfigWildcards.get(key);
Review Comment:
Why do we need to limit the "wildcard" configurations at all? This should be
up to the cluster admin to decide.
We should ensure to remove wildcards defined in the `clusterConfiguration`
to prevent users from overriding it.
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -269,20 +274,86 @@ private List<String> collectNotAllowedConfigurations() {
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));
- if (!Arrays.equals(originalCheckpointConfigSerialized,
serializeConfig(checkpointCfg))) {
+ final Configuration enrichedClusterConfig = new
Configuration(originalClusterConfig);
+ enrichProgramConfigWildcards(enrichedClusterConfig);
+
+ // Check CheckpointConfig
+ final CheckpointConfig clusterCheckpointConfig = new
CheckpointConfig();
+ clusterCheckpointConfig.configure(enrichedClusterConfig);
Review Comment:
can we simply use the `configuration` here? (we've already checked whether
it only contains allowed overrides)
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -206,35 +209,28 @@ private void validateAllowedExecution() {
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
+ final Configuration clusterConfig,
Review Comment:
```suggestion
final Configuration clusterConfiguration,
```
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -68,25 +74,23 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
private final boolean suppressSysout;
private final boolean enforceSingleJobExecution;
- private final byte[] originalCheckpointConfigSerialized;
- private final byte[] originalExecutionConfigSerialized;
- private final Configuration originalConfiguration;
+ private final Configuration originalClusterConfig;
Review Comment:
do we need to have the `original` prefix here? "cluster" IMO already implies
that this can't change anymore
```suggestion
private final Configuration clusterConfiguration;
```
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -244,12 +240,21 @@ public static void unsetAsContext() {
}
private List<String> collectNotAllowedConfigurations() {
Review Comment:
nit
```suggestion
private Collection<String> collectNotAllowedConfigurations() {
```
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
@Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
+ final Configuration clusterConfig,
+ final Configuration envConfig,
Review Comment:
the rename seems rather confusing since the parent class sets it into the
`configuration` field anyway;
```suggestion
final Configuration clusterConfiguration,
final Configuration configuration,
```
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -206,35 +209,28 @@ private void validateAllowedExecution() {
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
+ final Configuration clusterConfig,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
- StreamExecutionEnvironmentFactory factory =
- conf -> {
- final List<String> errors = new ArrayList<>();
- final boolean allowConfigurations =
- configuration.getBoolean(
-
DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
- if (!allowConfigurations && !conf.toMap().isEmpty()) {
- conf.toMap()
- .forEach(
- (k, v) ->
- errors.add(
-
ConfigurationNotAllowedMessage
-
.ofConfigurationKeyAndValue(k, v)));
- }
- Configuration mergedConfiguration = new Configuration();
- mergedConfiguration.addAll(configuration);
- mergedConfiguration.addAll(conf);
+ final StreamExecutionEnvironmentFactory factory =
+ envInitConfig -> {
+ final boolean programConfigEnabled =
+
clusterConfig.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED);
+ final List<String> programConfigWildcards =
+
clusterConfig.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);
Review Comment:
nit: maybe inline these?
##########
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java:
##########
@@ -97,21 +101,20 @@ public StreamContextEnvironment(
@Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
+ final Configuration clusterConfig,
+ final Configuration envConfig,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout,
- final boolean allowConfigurations,
- final Collection<String> errorMessages) {
- super(executorServiceLoader, configuration, userCodeClassLoader);
+ final boolean programConfigEnabled,
+ final List<String> programConfigWildcards) {
+ super(executorServiceLoader, envConfig, userCodeClassLoader);
this.suppressSysout = suppressSysout;
this.enforceSingleJobExecution = enforceSingleJobExecution;
- this.allowConfigurations = allowConfigurations;
- this.originalCheckpointConfigSerialized =
serializeConfig(checkpointCfg);
- this.originalExecutionConfigSerialized = serializeConfig(config);
- this.originalConfiguration = new Configuration(configuration);
- this.errorMessages = errorMessages;
+ this.originalClusterConfig = new Configuration(clusterConfig);
Review Comment:
cloning seems unnecessary
##########
docs/layouts/shortcodes/generated/deployment_configuration.html:
##########
@@ -26,6 +20,18 @@
<td>List<String></td>
<td>Custom JobListeners to be registered with the execution
environment. The registered listeners cannot have constructors with
arguments.</td>
</tr>
+ <tr>
+ <td><h5>execution.program-config.enabled</h5></td>
Review Comment:
I'm not really sure if `program` is the right name. There is currently no
concept of "user program" in Flink (we're only talking about jobs and
applications)
Here are few suggestions:
- `programmatic-configuration`
- `per-environment-configuration`
- `configuration-overrides`
- `{user,client}-configuration`
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]