zhuzhurk commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1448756961
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -2562,19 +2505,26 @@ private StreamGraphGenerator
getStreamGraphGenerator(List<Transformation<?>> tra
"No operators defined in streaming topology. Cannot
execute.");
}
+ // Synchronize the cached file to config option
PipelineOptions.CACHED_FILES
+ if (!getCachedFiles().isEmpty()) {
Review Comment:
Why is this needed? Looks to me it is already set to configuration once
updated in `registerCachedFile()`.
##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1227,7 +1227,13 @@ public void configure(ReadableConfig configuration,
ClassLoader classLoader) {
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
configuration
.getOptional(RestartStrategyOptions.RESTART_STRATEGY)
- .ifPresent(s -> this.setRestartStrategy(configuration));
+ .ifPresent(
Review Comment:
I guess the line above
`RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);`
is no longer needed?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -476,13 +470,6 @@ private void testBufferTimeout(Configuration config,
StreamExecutionEnvironment
config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
env.configure(config, this.getClass().getClassLoader());
assertThat(env.getBufferTimeout()).isZero();
-
- assertThatThrownBy(
- () -> {
-
config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
- env.configure(config,
this.getClass().getClassLoader());
Review Comment:
I guess this verification is still needed, but with the new change it should
verify that an exception will be thrown if buffer timeout is enabled but the
value is `-1ms`.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -145,9 +140,11 @@ public class StreamGraph implements Pipeline {
private boolean autoParallelismEnabled;
public StreamGraph(
+ Configuration jobConfiguration,
ExecutionConfig executionConfig,
CheckpointConfig checkpointConfig,
SavepointRestoreSettings savepointRestoreSettings) {
+ this.jobConfiguration = checkNotNull(jobConfiguration);
Review Comment:
It's better to create a new `Configuration` to avoid be affected by the
modification to the original `Configuration` in the env.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##########
@@ -82,7 +83,8 @@ public void setUp() {
@Test
public void testImmediateCheckpointing() throws Exception {
env.setRestartStrategy(RestartStrategies.noRestart());
- env.enableCheckpointing(Long.MAX_VALUE - 1);
+ env.enableCheckpointing(
+ Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK
*/).toMillis());
Review Comment:
What's this change for?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -460,12 +460,6 @@ void testBufferTimeoutDisabled() {
env.configure(config, this.getClass().getClassLoader());
assertThat(env.getBufferTimeout())
.isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
-
- // Setting execution.buffer-timeout's to -1ms will not take effect.
- config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
- env.configure(config, this.getClass().getClassLoader());
- assertThat(env.getBufferTimeout())
- .isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
Review Comment:
I guess this verification is still needed to verify that `BUFFER_TIMEOUT`
will not be used if buffer timeout is disabled.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java:
##########
@@ -222,7 +223,7 @@ public void testGettingEnvironmentWithConfiguration() {
assertThat(env.getParallelism(), equalTo(10));
assertThat(env.getConfig().getAutoWatermarkInterval(), equalTo(100L));
- assertThat(env.getStateBackend(),
instanceOf(MemoryStateBackend.class));
+ assertThat(env.getStateBackend(), nullValue());
Review Comment:
I would remove this verification and the setting of state.backend.
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java:
##########
@@ -108,11 +108,15 @@ StreamOperatorStateContext build(Logger logger) throws
IOException {
StateBackend stateBackend;
try {
+ Configuration jobConfig = environment.getJobConfiguration();
+ jobConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG,
false);
+ Configuration clusterConfig = new Configuration(configuration);
+ clusterConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG,
false);
Review Comment:
IIUC, the cluster config is possible to be true currently. But we always set
it to false here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -792,7 +792,11 @@ public CompletableFuture<Acknowledge>
disposeSavepoint(String savepointPath, Tim
try {
Checkpoints.disposeSavepoint(
- savepointPath, configuration, classLoader,
log);
+ savepointPath,
+ new Configuration(),
Review Comment:
Why using an empty job configuration here?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java:
##########
@@ -72,7 +73,7 @@ public void testLoadingStateBackendFromConfiguration() {
configuration, Thread.currentThread().getContextClassLoader());
StateBackend actualStateBackend =
envFromConfiguration.getStateBackend();
- assertThat(actualStateBackend, instanceOf(MemoryStateBackend.class));
+ assertThat(actualStateBackend, nullValue());
Review Comment:
This test becomes nonsense if changing it like this. I would remove it as
long as there is a test to cover the case that the state backend set via
`StreamExecutionEnvironment#configure` can be properly created at runtime.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -1016,6 +1016,10 @@ public void configure(ReadableConfig configuration) {
configuration
.getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
.ifPresent(this::setCheckpointStorage);
+ // reset checkpoint storage for backward compatibility
+ configuration
+ .getOptional(CheckpointingOptions.CHECKPOINT_STORAGE)
+ .ifPresent(ignored -> this.storage = null);
Review Comment:
It's a bit weird here to set and reset the checkpointStorage back and forth.
Maybe do not invoke `setCheckpointStorage` but direct do
`configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,...)`?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -520,9 +518,7 @@ void testConfigureCheckpointStorage() {
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
CheckpointStorage storage =
env.getCheckpointConfig().getCheckpointStorage();
- assertThat(storage).isInstanceOf(JobManagerCheckpointStorage.class);
- assertThat(((JobManagerCheckpointStorage) storage).getCheckpointPath())
- .isEqualTo(new Path(path));
+ assertThat(storage).isNull();
Review Comment:
This test becomes nonsense. I would remove it entirely as long as there is a
test to cover the case that the checkpoint storage set in
StreamExecutionEnvironment can be properly created at runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]