This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d894d1293 [FLINK-28286][state] move 'enablechangelog' constant to 
flink-core module
f1d894d1293 is described below

commit f1d894d1293e966dbdca06a48e4281d2daf63978
Author: fredia <[email protected]>
AuthorDate: Mon Jul 11 23:28:04 2022 +0800

    [FLINK-28286][state] move 'enablechangelog' constant to flink-core module
---
 .../StateChangelogOptionsInternal.java             | 37 ++++++++++++++++++++++
 .../flink/state/api/BootstrapTransformation.java   |  3 --
 .../state/api/StateBootstrapTransformation.java    |  3 --
 .../state/api/runtime/SavepointEnvironment.java    |  7 +++-
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 12 +++++++
 .../api/graph/StreamingJobGraphGenerator.java      |  3 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 14 +++++---
 7 files changed, 67 insertions(+), 12 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
new file mode 100644
index 00000000000..acf08515ef8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static 
org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG;
+
+/** StateChangelog options that are used to pass job-level configuration from 
JM to TM. */
+@Internal
+public class StateChangelogOptionsInternal {
+
+    public static final ConfigOption<Boolean> 
ENABLE_CHANGE_LOG_FOR_APPLICATION =
+            
ConfigOptions.key("state.backend.changelog.enabled_for_application")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription(
+                            String.format(
+                                    "Whether to enable job-level changelog."
+                                            + "If this config is not set 
explicitly, it would use %s's value",
+                                    ENABLE_STATE_CHANGE_LOG.key()));
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index a58fb1e5174..fc60ab66ebc 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -232,8 +231,6 @@ public class BootstrapTransformation<T> {
         config.setOperatorName(operatorID.toHexString());
         config.setOperatorID(operatorID);
         config.setStateBackend(stateBackend);
-        // This means leaving this stateBackend unwrapped.
-        config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
         
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
 1.0);
         config.serializeAllConfigs();
         return config;
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
index 64a898df1ce..70afd8c5169 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -220,8 +219,6 @@ public class StateBootstrapTransformation<T> {
         config.setOperatorName(operatorID.toHexString());
         config.setOperatorID(operatorID);
         config.setStateBackend(stateBackend);
-        // This means leaving this stateBackend unwrapped.
-        config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
         
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
 1.0);
         config.serializeAllConfigs();
         return config;
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 71ee278eee8..6c0b45b1e1a 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -167,7 +168,11 @@ public class SavepointEnvironment implements Environment {
 
     @Override
     public Configuration getJobConfiguration() {
-        throw new UnsupportedOperationException(ERROR_MSG);
+        Configuration jobConfiguration = new Configuration();
+        // This means leaving this stateBackend unwrapped.
+        jobConfiguration.setBoolean(
+                
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION, false);
+        return jobConfiguration;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1d81a753e26..1f098fc2632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -31,6 +32,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
 
@@ -628,4 +630,14 @@ public class JobGraph implements Serializable {
                     userArtifact.getKey(), userArtifact.getValue(), 
jobConfiguration);
         }
     }
+
+    public void setChangelogStateBackendEnabled(TernaryBoolean 
changelogStateBackendEnabled) {
+        if (changelogStateBackendEnabled == null
+                || 
TernaryBoolean.UNDEFINED.equals(changelogStateBackendEnabled)) {
+            return;
+        }
+        this.jobConfiguration.setBoolean(
+                
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION,
+                changelogStateBackendEnabled.getAsBoolean());
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d83e5dfa098..e817f1231ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -252,6 +252,8 @@ public class StreamingJobGraphGenerator {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        
jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());
+
         addVertexIndexPrefixInVertexName();
 
         setVertexDescription();
@@ -928,7 +930,6 @@ public class StreamingJobGraphGenerator {
         final CheckpointConfig checkpointCfg = 
streamGraph.getCheckpointConfig();
 
         config.setStateBackend(streamGraph.getStateBackend());
-        
config.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());
         config.setCheckpointStorage(streamGraph.getCheckpointStorage());
         config.setSavepointDir(streamGraph.getSavepointDirectory());
         config.setGraphContainingLoops(streamGraph.isIterative());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 19bc707a363..2e0610113b9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.AutoCloseableRegistry;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -1462,14 +1463,19 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     private StateBackend createStateBackend() throws Exception {
         final StateBackend fromApplication =
                 configuration.getStateBackend(getUserCodeClassLoader());
+        final Optional<Boolean> isChangelogEnabledOptional =
+                environment
+                        .getJobConfiguration()
+                        .getOptional(
+                                
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION);
         final TernaryBoolean isChangelogStateBackendEnableFromApplication =
-                
configuration.isChangelogStateBackendEnabled(getUserCodeClassLoader());
+                isChangelogEnabledOptional.isPresent()
+                        ? 
TernaryBoolean.fromBoolean(isChangelogEnabledOptional.get())
+                        : TernaryBoolean.UNDEFINED;
 
         return StateBackendLoader.fromApplicationOrConfigOrDefault(
                 fromApplication,
-                isChangelogStateBackendEnableFromApplication == null
-                        ? TernaryBoolean.UNDEFINED
-                        : isChangelogStateBackendEnableFromApplication,
+                isChangelogStateBackendEnableFromApplication,
                 getEnvironment().getTaskManagerInfo().getConfiguration(),
                 getUserCodeClassLoader(),
                 LOG);

Reply via email to