pnowojski commented on code in PR #21248:
URL: https://github.com/apache/flink/pull/21248#discussion_r1015544439
##########
flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java:
##########
@@ -97,13 +90,85 @@ void testDisallowProgramConfigurationChanges(
PipelineOptions.MAX_PARALLELISM.key());
}
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testDisallowCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(clusterConfig,
Collections.emptyList());
+
+ String disallowedPath = "file:///flink/disallowed/modification";
+ // Change the CheckpointConfig
+ environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(MutatedConfigurationException.class)
+ .hasMessageContainingAll(
+ CheckpointConfig.class.getSimpleName(),
"setCheckpointStorage");
+
+ environment.getCheckpointConfig().setCheckpointStorage(new
JobManagerCheckpointStorage());
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(MutatedConfigurationException.class)
+ .hasMessageContainingAll(
+ CheckpointConfig.class.getSimpleName(),
"setCheckpointStorage");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testAllowCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(
+ clusterConfig,
+
Arrays.asList(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
+
+ String allowedPath = "file:///flink/allowed/modification";
+ // Change the CheckpointConfig
+ environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(ExecutorReachedException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideExecutors")
+ void testNotModifiedCheckpointStorage(
+ ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
+ final Configuration clusterConfig = new Configuration();
+ clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
+ clusterConfig.set(DeploymentOptions.TARGET, "local");
+ clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///flink/checkpoints");
+
+ final StreamContextEnvironment environment =
+ constructStreamContextEnvironment(clusterConfig,
Collections.emptyList());
+
+ environment.fromCollection(Collections.singleton(1)).addSink(new
DiscardingSink<>());
+ assertThatThrownBy(() -> executor.accept(environment))
+ .isInstanceOf(ExecutorReachedException.class);
+ }
+
@ParameterizedTest
@MethodSource("provideExecutors")
void testAllowProgramConfigurationWildcards(
ThrowingConsumer<StreamExecutionEnvironment, Exception> executor) {
final Configuration clusterConfig = new Configuration();
clusterConfig.set(DeploymentOptions.TARGET, "local");
clusterConfig.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.STREAMING);
+
// Test prefix map notation
Review Comment:
I've kept the code as it is, just changed the comment, to indicate that we
are testing
```
// Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of
the fields not checked
// with PROGRAM_CONFIG_ENABLED set to false
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]