This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68f2bc3b80e0c29a378fa9f30ffb9cd6d46aba58 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 10 09:47:29 2021 +0100 [hotfix][hack] Make ChangelogStateBackend randomized tests work with forced full snapshots --- .../flink/runtime/minicluster/MiniCluster.java | 30 ++++++++++++++++++++++ .../streaming/util/TestStreamEnvironment.java | 7 ++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index d42abb2..2f2c498 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.minicluster; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; @@ -57,6 +58,9 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -666,6 +670,16 @@ public class MiniCluster implements AutoCloseableAsync { return miniClusterConfiguration.getConfiguration(); } + // HACK: temporary hack to make the randomized changelog state backend tests work with forced + // full snapshots. This option should be removed once changelog state backend supports forced + // full snapshots + @Internal private boolean overrideRestoreModeForRandomizedChangelogStateBackend; + + @Internal + public void overrideRestoreModeForRandomizedChangelogStateBackend() { + this.overrideRestoreModeForRandomizedChangelogStateBackend = true; + } + @GuardedBy("lock") private Collection<? extends CompletableFuture<Void>> terminateTaskManagers() { final Collection<CompletableFuture<Void>> terminationFutures = @@ -858,6 +872,7 @@ public class MiniCluster implements AutoCloseableAsync { } public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { + checkRestoreModeForRandomizedChangelogStateBackend(jobGraph); final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = @@ -875,6 +890,21 @@ public class MiniCluster implements AutoCloseableAsync { (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); } + // HACK: temporary hack to make the randomized changelog state backend tests work with forced + // full snapshots. This option should be removed once changelog state backend supports forced + // full snapshots + private void checkRestoreModeForRandomizedChangelogStateBackend(JobGraph jobGraph) { + final SavepointRestoreSettings savepointRestoreSettings = + jobGraph.getSavepointRestoreSettings(); + if (overrideRestoreModeForRandomizedChangelogStateBackend + && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) { + final Configuration conf = new Configuration(); + SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf); + conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY); + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf)); + } + } + public CompletableFuture<JobResult> requestJobResult(JobID jobId) { return runDispatcherCommand( dispatcherGateway -> diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 00f5692..4bf603c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -124,8 +124,13 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); + miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend(); } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { - randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); + boolean enabled = + randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); + if (enabled) { + miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend(); + } } } }
