twalthr commented on code in PR #21248:
URL: https://github.com/apache/flink/pull/21248#discussion_r1015401370


##########
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java:
##########
@@ -765,6 +765,18 @@ public <T> boolean removeConfig(ConfigOption<T> 
configOption) {
         }
     }
 
+    /**
+     * Removes given key from the configuration.
+     *
+     * @param key key of a config option to remove
+     * @return true is config has been removed, false otherwise
+     */
+    public boolean removeKey(String key) {
+        synchronized (this.confData) {
+            return this.confData.remove(key) != null;

Review Comment:
   Let's also call `removePrefixMap` here to make sure all variations of this 
key are gone. This should be safe as all keys represent leaves.



##########
flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java:
##########
@@ -119,9 +119,8 @@ public class DeploymentOptions {
                                     .linebreak()
                                     .linebreak()
                                     .text(
-                                            "Currently, this list is limited 
to '%s' only.",
-                                            TextElement.text(
-                                                    
PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
+                                            "Currently changes to the 
ExecutionConfig class, that are not backed by the"

Review Comment:
   Phrase this more generic because we now also support `CheckpointCfg`: 
"...that are not backed by config options."



##########
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java:
##########
@@ -765,6 +765,18 @@ public <T> boolean removeConfig(ConfigOption<T> 
configOption) {
         }
     }
 
+    /**
+     * Removes given key from the configuration.
+     *
+     * @param key key of a config option to remove
+     * @return true is config has been removed, false otherwise
+     */
+    public boolean removeKey(String key) {

Review Comment:
   Add a test to `ConfigurationTest`



##########
flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java:
##########
@@ -97,13 +90,85 @@ void testDisallowProgramConfigurationChanges(
                         PipelineOptions.MAX_PARALLELISM.key());
     }
 
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testDisallowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
+
+        String disallowedPath = "file:///flink/disallowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), 
"setCheckpointStorage");
+
+        environment.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), 
"setCheckpointStorage");
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testAllowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(
+                        clusterConfig,
+                        
Arrays.asList(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
+
+        String allowedPath = "file:///flink/allowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testNotModifiedCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
     @ParameterizedTest
     @MethodSource("provideExecutors")
     void testAllowProgramConfigurationWildcards(
             ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
         final Configuration clusterConfig = new Configuration();
         clusterConfig.set(DeploymentOptions.TARGET, "local");
         clusterConfig.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.STREAMING);
+
         // Test prefix map notation

Review Comment:
   This test was supposed to test the prefix map notation. But since global job 
parameters are not backed by configuration, we can also drop this line or 
replace it with something else.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ConfigurationNotAllowedMessage.java:
##########
@@ -66,4 +66,9 @@ public static String ofConfigurationObjectRemoved(
                 "Configuration %s:%s was removed from the configuration object 
%s.",
                 configKey, configValue, configurationObject);
     }
+
+    public static String ofConfigurationObjectSetterUsed(

Review Comment:
   nit: keep the methods simple and remove `Configuration` prefix everywhere: 
`ofObjectSetterUsed `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to