This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ed70a1e8b5 [FLINK-28178][runtime-web] Show the delegated StateBackend 
and whether changelog is enabled in the UI
9ed70a1e8b5 is described below

commit 9ed70a1e8b5d59abdf9d7673bc5b44d421140ef0
Author: wangfeifan <[email protected]>
AuthorDate: Wed Jun 29 15:58:46 2022 +0800

    [FLINK-28178][runtime-web] Show the delegated StateBackend and whether 
changelog is enabled in the UI
---
 docs/layouts/shortcodes/generated/rest_v1_dispatcher.html |  3 +++
 docs/static/generated/rest_v1_dispatcher.yml              |  2 ++
 flink-runtime-web/src/test/resources/rest_api_v1.snapshot |  5 ++++-
 .../web-dashboard/src/app/interfaces/job-checkpoint.ts    |  1 +
 .../pages/job/checkpoints/job-checkpoints.component.html  | 10 +++++-----
 .../runtime/executiongraph/AccessExecutionGraph.java      |  8 ++++++++
 .../runtime/executiongraph/ArchivedExecutionGraph.java    | 14 ++++++++++++++
 .../runtime/executiongraph/DefaultExecutionGraph.java     | 15 ++++++++++++++-
 .../handler/job/checkpoints/CheckpointConfigHandler.java  |  6 ++++++
 .../rest/messages/checkpoints/CheckpointConfigInfo.java   |  9 +++++++++
 .../java/org/apache/flink/runtime/state/StateBackend.java |  9 +++++++++
 .../apache/flink/runtime/state/StateBackendLoader.java    |  2 +-
 .../runtime/state/delegate/DelegatingStateBackend.java    |  5 +++++
 .../legacy/utils/ArchivedExecutionGraphBuilder.java       |  2 ++
 .../messages/checkpoints/CheckpointConfigInfoTest.java    |  1 +
 .../adaptive/StateTrackingMockExecutionGraph.java         |  6 ++++++
 16 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 8d5e8191415..ea4b36e4be3 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -2335,6 +2335,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     "state_backend" : {
       "type" : "string"
     },
+    "state_changelog_enabled" : {
+      "type" : "boolean"
+    },
     "timeout" : {
       "type" : "integer"
     },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 2d8a79402ef..1cc1579dfb0 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2864,6 +2864,8 @@ components:
           format: int64
         checkpoints_after_tasks_finish:
           type: boolean
+        state_changelog_enabled:
+          type: boolean
         changelog_periodic_materialization_interval:
           type: integer
           format: int64
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 85337fa8795..ef9be2485d2 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1408,6 +1408,9 @@
         "checkpoints_after_tasks_finish" : {
           "type" : "boolean"
         },
+        "state_changelog_enabled" : {
+          "type" : "boolean"
+        },
         "changelog_periodic_materialization_interval" : {
           "type" : "integer"
         },
@@ -3842,4 +3845,4 @@
       }
     }
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
index 8dc270f1a73..f4f8ddae841 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
@@ -128,6 +128,7 @@ export interface CheckpointConfig {
     delete_on_cancellation: boolean;
   };
   state_backend: string;
+  state_changelog_enabled: boolean;
   checkpoint_storage: string;
   unaligned_checkpoints: boolean;
   tolerable_failed_checkpoints: number;
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
index b82659dab74..3b7fb0f41bb 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html
@@ -551,15 +551,15 @@
             </td>
           </tr>
           <tr>
-            <td>Changelog state-backend</td>
-            <td *ngIf="checkPointConfig['state_backend'] === 
'ChangelogStateBackend'">Enabled</td>
-            <td *ngIf="checkPointConfig['state_backend'] !== 
'ChangelogStateBackend'">Disabled</td>
+            <td>State Changelog</td>
+            <td 
*ngIf="checkPointConfig['state_changelog_enabled']">Enabled</td>
+            <td 
*ngIf="!checkPointConfig['state_changelog_enabled']">Disabled</td>
           </tr>
-          <tr *ngIf="checkPointConfig['state_backend'] === 
'ChangelogStateBackend'">
+          <tr *ngIf="checkPointConfig['state_changelog_enabled']">
             <td>Changelog Storage</td>
             <td>{{ checkPointConfig['changelog_storage'] }}</td>
           </tr>
-          <tr *ngIf="checkPointConfig['state_backend'] === 
'ChangelogStateBackend'">
+          <tr *ngIf="checkPointConfig['state_changelog_enabled']">
             <td>Changelog Periodic Materialization Interval</td>
             <td>
               {{
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 7dafd8b5d55..011e864a4fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
 
@@ -176,6 +177,13 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
      */
     Optional<String> getCheckpointStorageName();
 
+    /**
+     * Returns whether the state changelog is enabled for this ExecutionGraph.
+     *
+     * @return true, if state changelog enabled, false otherwise.
+     */
+    TernaryBoolean isChangelogStateBackendEnabled();
+
     /**
      * Returns the changelog storage name for this ExecutionGraph.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index c9d08ae110c..566efdee394 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
 
@@ -98,6 +99,8 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
 
     @Nullable private final String checkpointStorageName;
 
+    @Nullable private final TernaryBoolean stateChangelogEnabled;
+
     @Nullable private final String changelogStorageName;
 
     public ArchivedExecutionGraph(
@@ -117,6 +120,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             @Nullable CheckpointStatsSnapshot checkpointStatsSnapshot,
             @Nullable String stateBackendName,
             @Nullable String checkpointStorageName,
+            @Nullable TernaryBoolean stateChangelogEnabled,
             @Nullable String changelogStorageName) {
 
         this.jobID = Preconditions.checkNotNull(jobID);
@@ -135,6 +139,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         this.checkpointStatsSnapshot = checkpointStatsSnapshot;
         this.stateBackendName = stateBackendName;
         this.checkpointStorageName = checkpointStorageName;
+        this.stateChangelogEnabled = stateChangelogEnabled;
         this.changelogStorageName = changelogStorageName;
     }
 
@@ -266,6 +271,11 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         return Optional.ofNullable(checkpointStorageName);
     }
 
+    @Override
+    public TernaryBoolean isChangelogStateBackendEnabled() {
+        return stateChangelogEnabled;
+    }
+
     @Override
     public Optional<String> getChangelogStorageName() {
         return Optional.ofNullable(changelogStorageName);
@@ -337,6 +347,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 executionGraph.getCheckpointStatsSnapshot(),
                 executionGraph.getStateBackendName().orElse(null),
                 executionGraph.getCheckpointStorageName().orElse(null),
+                executionGraph.isChangelogStateBackendEnabled(),
                 executionGraph.getChangelogStorageName().orElse(null));
     }
 
@@ -391,6 +402,9 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 checkpointingSettings == null ? null : 
CheckpointStatsSnapshot.empty(),
                 checkpointingSettings == null ? null : "Unknown",
                 checkpointingSettings == null ? null : "Unknown",
+                checkpointingSettings == null
+                        ? TernaryBoolean.UNDEFINED
+                        : 
checkpointingSettings.isChangelogStateBackendEnabled(),
                 checkpointingSettings == null ? null : "Unknown");
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index ce1f18c5504..164ff1eb567 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
@@ -84,6 +85,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
@@ -271,6 +273,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
 
     @Nullable private String changelogStorageName;
 
+    @Nullable private TernaryBoolean stateChangelogEnabled;
+
     private String jsonPlan;
 
     /** Shuffle master to register partitions for task deployment. */
@@ -420,6 +424,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         return jobMasterMainThreadExecutor;
     }
 
+    @Override
+    public TernaryBoolean isChangelogStateBackendEnabled() {
+        return stateChangelogEnabled;
+    }
+
     @Override
     public Optional<String> getStateBackendName() {
         return Optional.ofNullable(stateBackendName);
@@ -517,7 +526,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
             
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
         }
 
-        this.stateBackendName = 
checkpointStateBackend.getClass().getSimpleName();
+        this.stateBackendName = checkpointStateBackend.getName();
+        this.stateChangelogEnabled =
+                TernaryBoolean.fromBoolean(
+                        
StateBackendLoader.isChangelogStateBackend(checkpointStateBackend));
+
         this.checkpointStorageName = 
checkpointStorage.getClass().getSimpleName();
         this.changelogStorageName = changelogStorageName;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 878bcabd89b..4f2a55ce1df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -122,6 +122,11 @@ public class CheckpointConfigHandler
                             .getPeriodicMaterializeIntervalMillis();
             String changelogStorageName = 
executionGraph.getChangelogStorageName().orElse(null);
 
+            boolean stateChangelogEnabled =
+                    executionGraph.isChangelogStateBackendEnabled() != null
+                            ? 
executionGraph.isChangelogStateBackendEnabled().getOrDefault(false)
+                            : false;
+
             return new CheckpointConfigInfo(
                     checkpointCoordinatorConfiguration.isExactlyOnce()
                             ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE
@@ -137,6 +142,7 @@ public class CheckpointConfigHandler
                     
checkpointCoordinatorConfiguration.getTolerableCheckpointFailureNumber(),
                     
checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout(),
                     
checkpointCoordinatorConfiguration.isEnableCheckpointsAfterTasksFinish(),
+                    stateChangelogEnabled,
                     periodicMaterializeIntervalMillis,
                     changelogStorageName);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
index 9385f77102a..86f9cbb4dc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
@@ -65,6 +65,8 @@ public class CheckpointConfigInfo implements ResponseBody {
     public static final String FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH =
             "checkpoints_after_tasks_finish";
 
+    public static final String FIELD_NAME_STATE_CHANGELOG = 
"state_changelog_enabled";
+
     public static final String FIELD_NAME_PERIODIC_MATERIALIZATION_INTERVAL =
             "changelog_periodic_materialization_interval";
 
@@ -106,6 +108,9 @@ public class CheckpointConfigInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH)
     private final boolean checkpointsWithFinishedTasks;
 
+    @JsonProperty(FIELD_NAME_STATE_CHANGELOG)
+    private final boolean stateChangelog;
+
     @JsonProperty(FIELD_NAME_PERIODIC_MATERIALIZATION_INTERVAL)
     private final long periodicMaterializationInterval;
 
@@ -128,6 +133,7 @@ public class CheckpointConfigInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_ALIGNED_CHECKPOINT_TIMEOUT) long 
alignedCheckpointTimeout,
             @JsonProperty(FIELD_NAME_CHECKPOINTS_AFTER_TASKS_FINISH)
                     boolean checkpointsWithFinishedTasks,
+            @JsonProperty(FIELD_NAME_STATE_CHANGELOG) boolean stateChangelog,
             @JsonProperty(FIELD_NAME_PERIODIC_MATERIALIZATION_INTERVAL)
                     long periodicMaterializationInterval,
             @JsonProperty(FIELD_NAME_CHANGELOG_STORAGE) String 
changelogStorage) {
@@ -143,6 +149,7 @@ public class CheckpointConfigInfo implements ResponseBody {
         this.tolerableFailedCheckpoints = tolerableFailedCheckpoints;
         this.alignedCheckpointTimeout = alignedCheckpointTimeout;
         this.checkpointsWithFinishedTasks = checkpointsWithFinishedTasks;
+        this.stateChangelog = stateChangelog;
         this.periodicMaterializationInterval = periodicMaterializationInterval;
         this.changelogStorage = changelogStorage;
     }
@@ -168,6 +175,7 @@ public class CheckpointConfigInfo implements ResponseBody {
                 && tolerableFailedCheckpoints == 
that.tolerableFailedCheckpoints
                 && alignedCheckpointTimeout == that.alignedCheckpointTimeout
                 && checkpointsWithFinishedTasks == 
that.checkpointsWithFinishedTasks
+                && stateChangelog == that.stateChangelog
                 && periodicMaterializationInterval == 
that.periodicMaterializationInterval
                 && changelogStorage == that.changelogStorage;
     }
@@ -187,6 +195,7 @@ public class CheckpointConfigInfo implements ResponseBody {
                 tolerableFailedCheckpoints,
                 alignedCheckpointTimeout,
                 checkpointsWithFinishedTasks,
+                stateChangelog,
                 periodicMaterializationInterval,
                 changelogStorage);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 4f904beef3c..df6fe5bbd99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -80,6 +80,15 @@ import java.util.Collection;
 @PublicEvolving
 public interface StateBackend extends java.io.Serializable {
 
+    /**
+     * Return the name of this backend, default is simple class name. {@link
+     * org.apache.flink.runtime.state.delegate.DelegatingStateBackend} may 
return the simple class
+     * name of the delegated backend.
+     */
+    default String getName() {
+        return this.getClass().getSimpleName();
+    }
+
     /**
      * Creates a new {@link CheckpointableKeyedStateBackend} that is 
responsible for holding
      * <b>keyed state</b> and checkpointing it.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
index 929728bdd04..6f8ef3263e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -391,7 +391,7 @@ public class StateBackendLoader {
         return originalStateBackend;
     }
 
-    private static boolean isChangelogStateBackend(StateBackend backend) {
+    public static boolean isChangelogStateBackend(StateBackend backend) {
         return CHANGELOG_STATE_BACKEND.equals(backend.getClass().getName());
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegatingStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegatingStateBackend.java
index 4bcc4cf15be..35bac9155b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegatingStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegatingStateBackend.java
@@ -29,4 +29,9 @@ import org.apache.flink.runtime.state.StateBackend;
 @Internal
 public interface DelegatingStateBackend extends StateBackend {
     StateBackend getDelegatedStateBackend();
+
+    @Override
+    default String getName() {
+        return getDelegatedStateBackend().getName();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index fcfd5fdbafe..4e074615361 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -159,6 +160,7 @@ public class ArchivedExecutionGraphBuilder {
                 null,
                 "stateBackendName",
                 "checkpointStorageName",
+                TernaryBoolean.UNDEFINED,
                 "changelogStorageName");
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
index bb2ffdd8d1d..a652ed89210 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
@@ -46,6 +46,7 @@ public class CheckpointConfigInfoTest
                 3,
                 4,
                 true,
+                false,
                 0,
                 null);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index 91eb6da1c69..9f3c9ba321a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -220,6 +221,11 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
         return Optional.empty();
     }
 
+    @Override
+    public TernaryBoolean isChangelogStateBackendEnabled() {
+        return TernaryBoolean.fromBoolean(false);
+    }
+
     @Override
     public Optional<String> getChangelogStorageName() {
         return Optional.empty();

Reply via email to