This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1d894d1293 [FLINK-28286][state] move 'enablechangelog' constant to
flink-core module
f1d894d1293 is described below
commit f1d894d1293e966dbdca06a48e4281d2daf63978
Author: fredia <[email protected]>
AuthorDate: Mon Jul 11 23:28:04 2022 +0800
[FLINK-28286][state] move 'enablechangelog' constant to flink-core module
---
.../StateChangelogOptionsInternal.java | 37 ++++++++++++++++++++++
.../flink/state/api/BootstrapTransformation.java | 3 --
.../state/api/StateBootstrapTransformation.java | 3 --
.../state/api/runtime/SavepointEnvironment.java | 7 +++-
.../apache/flink/runtime/jobgraph/JobGraph.java | 12 +++++++
.../api/graph/StreamingJobGraphGenerator.java | 3 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 14 +++++---
7 files changed, 67 insertions(+), 12 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
new file mode 100644
index 00000000000..acf08515ef8
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static
org.apache.flink.configuration.StateChangelogOptions.ENABLE_STATE_CHANGE_LOG;
+
+/** StateChangelog options that are used to pass job-level configuration from
JM to TM. */
+@Internal
+public class StateChangelogOptionsInternal {
+
+ public static final ConfigOption<Boolean>
ENABLE_CHANGE_LOG_FOR_APPLICATION =
+
ConfigOptions.key("state.backend.changelog.enabled_for_application")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription(
+ String.format(
+ "Whether to enable job-level changelog."
+ + "If this config is not set
explicitly, it would use %s's value",
+ ENABLE_STATE_CHANGE_LOG.key()));
+}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index a58fb1e5174..fc60ab66ebc 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -42,7 +42,6 @@ import
org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -232,8 +231,6 @@ public class BootstrapTransformation<T> {
config.setOperatorName(operatorID.toHexString());
config.setOperatorID(operatorID);
config.setStateBackend(stateBackend);
- // This means leaving this stateBackend unwrapped.
- config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
1.0);
config.serializeAllConfigs();
return config;
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
index 64a898df1ce..70afd8c5169 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -220,8 +219,6 @@ public class StateBootstrapTransformation<T> {
config.setOperatorName(operatorID.toHexString());
config.setOperatorID(operatorID);
config.setStateBackend(stateBackend);
- // This means leaving this stateBackend unwrapped.
- config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
1.0);
config.serializeAllConfigs();
return config;
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 71ee278eee8..6c0b45b1e1a 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -167,7 +168,11 @@ public class SavepointEnvironment implements Environment {
@Override
public Configuration getJobConfiguration() {
- throw new UnsupportedOperationException(ERROR_MSG);
+ Configuration jobConfiguration = new Configuration();
+ // This means leaving this stateBackend unwrapped.
+ jobConfiguration.setBoolean(
+
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION, false);
+ return jobConfiguration;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1d81a753e26..1f098fc2632 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -31,6 +32,7 @@ import
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nullable;
@@ -628,4 +630,14 @@ public class JobGraph implements Serializable {
userArtifact.getKey(), userArtifact.getValue(),
jobConfiguration);
}
}
+
+ public void setChangelogStateBackendEnabled(TernaryBoolean
changelogStateBackendEnabled) {
+ if (changelogStateBackendEnabled == null
+ ||
TernaryBoolean.UNDEFINED.equals(changelogStateBackendEnabled)) {
+ return;
+ }
+ this.jobConfiguration.setBoolean(
+
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION,
+ changelogStateBackendEnabled.getAsBoolean());
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d83e5dfa098..e817f1231ec 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -252,6 +252,8 @@ public class StreamingJobGraphGenerator {
+ "This indicates that non-serializable types
(like custom serializers) were registered");
}
+
jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());
+
addVertexIndexPrefixInVertexName();
setVertexDescription();
@@ -928,7 +930,6 @@ public class StreamingJobGraphGenerator {
final CheckpointConfig checkpointCfg =
streamGraph.getCheckpointConfig();
config.setStateBackend(streamGraph.getStateBackend());
-
config.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());
config.setCheckpointStorage(streamGraph.getCheckpointStorage());
config.setSavepointDir(streamGraph.getSavepointDirectory());
config.setGraphContainingLoops(streamGraph.isIterative());
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 19bc707a363..2e0610113b9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -1462,14 +1463,19 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication =
configuration.getStateBackend(getUserCodeClassLoader());
+ final Optional<Boolean> isChangelogEnabledOptional =
+ environment
+ .getJobConfiguration()
+ .getOptional(
+
StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION);
final TernaryBoolean isChangelogStateBackendEnableFromApplication =
-
configuration.isChangelogStateBackendEnabled(getUserCodeClassLoader());
+ isChangelogEnabledOptional.isPresent()
+ ?
TernaryBoolean.fromBoolean(isChangelogEnabledOptional.get())
+ : TernaryBoolean.UNDEFINED;
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
- isChangelogStateBackendEnableFromApplication == null
- ? TernaryBoolean.UNDEFINED
- : isChangelogStateBackendEnableFromApplication,
+ isChangelogStateBackendEnableFromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);