This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7158934bb [FLINK-34660][checkpoint] Avoid randomizing 
changelog-related config when it's pre-defined (#24488)
aa7158934bb is described below

commit aa7158934bbe17fa13945dddc32d14e2aa613ae5
Author: yhx <[email protected]>
AuthorDate: Thu Mar 14 09:44:57 2024 +0800

    [FLINK-34660][checkpoint] Avoid randomizing changelog-related config when 
it's pre-defined (#24488)
---
 .../flink/streaming/util/TestStreamEnvironment.java  | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 58ddd379652..0f82aa2a631 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -124,15 +124,17 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
         }
 
         // randomize ITTests for enabling state change log
-        if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
-            if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) 
{
+        if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
+            if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
                 conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
-                miniCluster.overrideRestoreModeForChangelogStateBackend();
+            } else if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
+                randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
true, false);
             }
-        } else if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
-            boolean enabled =
-                    randomize(conf, 
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
-            if (enabled) {
+        }
+
+        // randomize periodic materialization when enabling state change log
+        if (conf.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
+            if 
(!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED)) {
                 // More situations about enabling periodic materialization 
should be tested
                 randomize(
                         conf,
@@ -141,6 +143,8 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                         true,
                         true,
                         false);
+            }
+            if 
(!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)) {
                 randomize(
                         conf,
                         
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
@@ -148,8 +152,8 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                         Duration.ofMillis(500),
                         Duration.ofSeconds(1),
                         Duration.ofSeconds(5));
-                miniCluster.overrideRestoreModeForChangelogStateBackend();
             }
+            miniCluster.overrideRestoreModeForChangelogStateBackend();
         }
     }
 

Reply via email to