pnowojski commented on code in PR #21248:
URL: https://github.com/apache/flink/pull/21248#discussion_r1015544439


##########
flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java:
##########
@@ -97,13 +90,85 @@ void testDisallowProgramConfigurationChanges(
                         PipelineOptions.MAX_PARALLELISM.key());
     }
 
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testDisallowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
+
+        String disallowedPath = "file:///flink/disallowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), 
"setCheckpointStorage");
+
+        environment.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(MutatedConfigurationException.class)
+                .hasMessageContainingAll(
+                        CheckpointConfig.class.getSimpleName(), 
"setCheckpointStorage");
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testAllowCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(
+                        clusterConfig,
+                        
Arrays.asList(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
+
+        String allowedPath = "file:///flink/allowed/modification";
+        // Change the CheckpointConfig
+        environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExecutors")
+    void testNotModifiedCheckpointStorage(
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+        final Configuration clusterConfig = new Configuration();
+        clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+        clusterConfig.set(DeploymentOptions.TARGET, "local");
+        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///flink/checkpoints");
+
+        final StreamContextEnvironment environment =
+                constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
+
+        environment.fromCollection(Collections.singleton(1)).addSink(new 
DiscardingSink<>());
+        assertThatThrownBy(() -> executor.accept(environment))
+                .isInstanceOf(ExecutorReachedException.class);
+    }
+
     @ParameterizedTest
     @MethodSource("provideExecutors")
     void testAllowProgramConfigurationWildcards(
             ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
         final Configuration clusterConfig = new Configuration();
         clusterConfig.set(DeploymentOptions.TARGET, "local");
         clusterConfig.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.STREAMING);
+
         // Test prefix map notation

Review Comment:
   I've kept the code as it is, just changed the comment, to indicate that we 
are testing
   ```
           // Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of 
the fields not checked
           // with PROGRAM_CONFIG_ENABLED set to false
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to