twalthr commented on code in PR #21248:
URL: https://github.com/apache/flink/pull/21248#discussion_r1015401370
##########
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java:
##########
@@ -765,6 +765,18 @@ public <T> boolean removeConfig(ConfigOption<T>
configOption) {
}
}
+ /**
+ * Removes given key from the configuration.
+ *
+ * @param key key of a config option to remove
+ * @return true is config has been removed, false otherwise
+ */
+ public boolean removeKey(String key) {
+ synchronized (this.confData) {
+ return this.confData.remove(key) != null;
Review Comment:
Let's also call `removePrefixMap` here to make sure all variations of this
key are gone. This should be safe as all keys represent leaves.
##########
flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java:
##########
@@ -119,9 +119,8 @@ public class DeploymentOptions {
.linebreak()
.linebreak()
.text(
- "Currently, this list is limited
to '%s' only.",
- TextElement.text(
-
PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+ "Currently changes to the
ExecutionConfig class, that are not backed by the"
Review Comment:
Phrase this more generic because we now also support `CheckpointCfg`:
"...that are not backed by config options."
##########
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java:
##########
@@ -765,6 +765,18 @@ public <T> boolean removeConfig(ConfigOption<T>
configOption) {
}
}
+ /**
+ * Removes given key from the configuration.
+ *
+ * @param key key of a config option to remove
+ * @return true is config has been removed, false otherwise
+ */
+ public boolean removeKey(String key) {
Review Comment:
Add a test to `ConfigurationTest`
##########
flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java:
##########
@@ -97,13 +90,85 @@ void testDisallowProgramConfigurationChanges(
PipelineOptions.MAX_PARALLELISM.key());
}
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testDisallowCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(clusterConfig,
Collections.emptyList());
+
+ String disallowedPath = "file:///flink/disallowed/modification";
+ // Change the CheckpointConfig
+ environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(MutatedConfigurationException.class)
+ .hasMessageContainingAll(
+ CheckpointConfig.class.getSimpleName(),
"setCheckpointStorage");
+
+ environment.getCheckpointConfig().setCheckpointStorage(new
JobManagerCheckpointStorage());
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(MutatedConfigurationException.class)
+ .hasMessageContainingAll(
+ CheckpointConfig.class.getSimpleName(),
"setCheckpointStorage");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testAllowCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(
+ clusterConfig,
+
Arrays.asList(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
+
+ String allowedPath = "file:///flink/allowed/modification";
+ // Change the CheckpointConfig
+ environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(ExecutorReachedException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testNotModifiedCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(clusterConfig,
Collections.emptyList());
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(ExecutorReachedException.class);
+ }
+
@ParameterizedTest
@MethodSource("provideExecutors")
void testAllowProgramConfigurationWildcards(
ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
final Configuration clusterConfig = new Configuration();
clusterConfig.set(DeploymentOptions.TARGET, "local");
clusterConfig.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.STREAMING);
+
// Test prefix map notation
Review Comment:
This test was supposed to test the prefix map notation. But since global job
parameters are not backed by configuration, we can also drop this line or
replace it with something else.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java:
##########
@@ -66,4 +66,9 @@ public static String ofConfigurationObjectRemoved(
"Configuration %s:%s was removed from the configuration object
%s.",
configKey, configValue, configurationObject);
}
+
+ public static String ofConfigurationObjectSetterUsed(
Review Comment:
nit: keep the methods simple and remove `Configuration` prefix everywhere:
`ofObjectSetterUsed `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]