This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcf4555391 [FLINK-36030][state / changelog] Don't use negative values 
in changelog recovery tests
6fcf4555391 is described below

commit 6fcf4555391487661ffab1e72f6d0e2875bca6a4
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sun Aug 11 21:18:30 2024 +0000

    [FLINK-36030][state / changelog] Don't use negative values in changelog 
recovery tests
---
 .../checkpointing/ChangelogRecoveryITCaseBase.java | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
index 6444f4d5dc4..72d301aed1f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
@@ -163,14 +163,21 @@ public abstract class ChangelogRecoveryITCaseBase extends 
TestLogger {
         env.getCheckpointConfig().enableUnalignedCheckpoints(false);
         env.setStateBackend(stateBackend)
                 
.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0));
-        env.configure(
-                new Configuration()
-                        .set(
-                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
-                                Duration.ofMillis(materializationInterval))
-                        .set(
-                                
StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
-                                materializationMaxFailure));
+        if (materializationInterval >= 0) {
+            env.configure(
+                    new Configuration()
+                            
.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, true)
+                            .set(
+                                    
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                    Duration.ofMillis(materializationInterval))
+                            .set(
+                                    
StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+                                    materializationMaxFailure));
+        } else {
+            env.configure(
+                    new Configuration()
+                            
.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, false));
+        }
         return env;
     }
 

Reply via email to