This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68f2bc3b80e0c29a378fa9f30ffb9cd6d46aba58
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Dec 10 09:47:29 2021 +0100

    [hotfix][hack] Make ChangelogStateBackend randomized tests work with forced 
full snapshots
---
 .../flink/runtime/minicluster/MiniCluster.java     | 30 ++++++++++++++++++++++
 .../streaming/util/TestStreamEnvironment.java      |  7 ++++-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d42abb2..2f2c498 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.minicluster;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -57,6 +58,9 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -666,6 +670,16 @@ public class MiniCluster implements AutoCloseableAsync {
         return miniClusterConfiguration.getConfiguration();
     }
 
+    // HACK: temporary hack to make the randomized changelog state backend 
tests work with forced
+    // full snapshots. This option should be removed once changelog state 
backend supports forced
+    // full snapshots
+    @Internal private boolean 
overrideRestoreModeForRandomizedChangelogStateBackend;
+
+    @Internal
+    public void overrideRestoreModeForRandomizedChangelogStateBackend() {
+        this.overrideRestoreModeForRandomizedChangelogStateBackend = true;
+    }
+
     @GuardedBy("lock")
     private Collection<? extends CompletableFuture<Void>> 
terminateTaskManagers() {
         final Collection<CompletableFuture<Void>> terminationFutures =
@@ -858,6 +872,7 @@ public class MiniCluster implements AutoCloseableAsync {
     }
 
     public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) 
{
+        checkRestoreModeForRandomizedChangelogStateBackend(jobGraph);
         final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
                 getDispatcherGatewayFuture();
         final CompletableFuture<InetSocketAddress> blobServerAddressFuture =
@@ -875,6 +890,21 @@ public class MiniCluster implements AutoCloseableAsync {
                 (Acknowledge ignored) -> new 
JobSubmissionResult(jobGraph.getJobID()));
     }
 
+    // HACK: temporary hack to make the randomized changelog state backend 
tests work with forced
+    // full snapshots. This option should be removed once changelog state 
backend supports forced
+    // full snapshots
+    private void checkRestoreModeForRandomizedChangelogStateBackend(JobGraph 
jobGraph) {
+        final SavepointRestoreSettings savepointRestoreSettings =
+                jobGraph.getSavepointRestoreSettings();
+        if (overrideRestoreModeForRandomizedChangelogStateBackend
+                && savepointRestoreSettings.getRestoreMode() == 
RestoreMode.NO_CLAIM) {
+            final Configuration conf = new Configuration();
+            SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
conf);
+            conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY);
+            
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
+        }
+    }
+
     public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
         return runDispatcherCommand(
                 dispatcherGateway ->
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 00f5692..4bf603c 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -124,8 +124,13 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
         if 
(isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
             if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
                 conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
+                
miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
             } else if 
(STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
-                randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
true, false);
+                boolean enabled =
+                        randomize(conf, 
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+                if (enabled) {
+                    
miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
+                }
             }
         }
     }

Reply via email to