This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new aa7158934bb [FLINK-34660][checkpoint] Avoid randomizing
changelog-related config when it's pre-defined (#24488)
aa7158934bb is described below
commit aa7158934bbe17fa13945dddc32d14e2aa613ae5
Author: yhx <[email protected]>
AuthorDate: Thu Mar 14 09:44:57 2024 +0800
[FLINK-34660][checkpoint] Avoid randomizing changelog-related config when
it's pre-defined (#24488)
---
.../flink/streaming/util/TestStreamEnvironment.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 58ddd379652..0f82aa2a631 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -124,15 +124,17 @@ public class TestStreamEnvironment extends
StreamExecutionEnvironment {
}
// randomize ITTests for enabling state change log
- if
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
- if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG))
{
+ if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
+ if
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
- miniCluster.overrideRestoreModeForChangelogStateBackend();
+ } else if
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
+ randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG,
true, false);
}
- } else if
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
- boolean enabled =
- randomize(conf,
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
- if (enabled) {
+ }
+
+ // randomize periodic materialization when enabling state change log
+ if (conf.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
+ if
(!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED)) {
// More situations about enabling periodic materialization
should be tested
randomize(
conf,
@@ -141,6 +143,8 @@ public class TestStreamEnvironment extends
StreamExecutionEnvironment {
true,
true,
false);
+ }
+ if
(!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)) {
randomize(
conf,
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
@@ -148,8 +152,8 @@ public class TestStreamEnvironment extends
StreamExecutionEnvironment {
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(5));
- miniCluster.overrideRestoreModeForChangelogStateBackend();
}
+ miniCluster.overrideRestoreModeForChangelogStateBackend();
}
}