This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cf0abb9b4 [Improve][Core] Unified the checkpoint setting key of Flink
(#4296)
cf0abb9b4 is described below
commit cf0abb9b4fa9f9d0ab8d667e6a22f04d534ad7c2
Author: Hisoka <[email protected]>
AuthorDate: Tue Mar 14 10:51:27 2023 +0800
[Improve][Core] Unified the checkpoint setting key of Flink (#4296)
* [Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field
* [Core] [Improve] Unify the checkpoint setting key of Flink
* Revert "[Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms
field"
This reverts commit cd95635ea482a2f0f785990f35f2f8cc47c96365.
* [Core] [Improve] Unify the checkpoint setting key of Flink
* [Core] [Improve] Unify the checkpoint setting key of Flink
---
.../src/main/java/org/apache/seatunnel/common/Constants.java | 4 ----
.../starter/flink/execution/FlinkRuntimeEnvironment.java | 12 ++++++++++--
.../seatunnel/core/starter/flink/utils/ConfigKeyName.java | 3 ++-
.../starter/flink/execution/FlinkRuntimeEnvironment.java | 12 ++++++++++--
.../seatunnel/core/starter/flink/utils/ConfigKeyName.java | 2 +-
.../org/apache/seatunnel/engine/server/NodeExtension.java | 2 +-
.../apache/seatunnel/engine/server/NodeExtensionCommon.java | 6 +++---
.../translation/spark/source/SeaTunnelSourceSupport.java | 4 +++-
.../spark/source/partition/micro/SeaTunnelMicroBatch.java | 3 ++-
9 files changed, 32 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index c5e04d5fa..602143e10 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.common;
public final class Constants {
- public static final String ROW_ROOT = "__root__";
- public static final String ROW_TMP = "__tmp__";
public static final String LOGO = "SeaTunnel";
@@ -39,8 +37,6 @@ public final class Constants {
public static final String HDFS_USER = "hdfs.user";
- public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";
-
public static final String CHECKPOINT_ID = "checkpoint.id";
public static final String UUID = "uuid";
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index e180ebb74..7fb75064a 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -235,9 +236,16 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
private void setCheckpoint() {
- if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+ long interval = 0;
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+ } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+ interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+ }
+
+ if (interval > 0) {
CheckpointConfig checkpointConfig =
environment.getCheckpointConfig();
- long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);
if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 78d444a57..699d7e379 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -27,7 +27,8 @@ public class ConfigKeyName {
public static final String BUFFER_TIMEOUT_MILLIS =
"execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
- public static final String CHECKPOINT_INTERVAL =
"execution.checkpoint.interval";
+
+ @Deprecated public static final String CHECKPOINT_INTERVAL =
"execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT =
"execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI =
"execution.checkpoint.data-uri";
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 951312b74..4b5bef07c 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
@@ -235,9 +236,16 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
private void setCheckpoint() {
- if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+ long interval = 0;
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+ } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+ interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+ }
+
+ if (interval > 0) {
CheckpointConfig checkpointConfig =
environment.getCheckpointConfig();
- long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);
if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 78d444a57..cc8229f26 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -27,7 +27,7 @@ public class ConfigKeyName {
public static final String BUFFER_TIMEOUT_MILLIS =
"execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
- public static final String CHECKPOINT_INTERVAL =
"execution.checkpoint.interval";
+ @Deprecated public static final String CHECKPOINT_INTERVAL =
"execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT =
"execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI =
"execution.checkpoint.data-uri";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 2c62314e2..d4137955c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -85,6 +85,6 @@ public class NodeExtension extends DefaultNodeExtension {
@Override
public void printNodeInfo() {
- extCommon.printNodeInfo(systemLogger, "");
+ extCommon.printNodeInfo(systemLogger);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
index 8cf0a3470..8551f9381 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
@@ -58,15 +58,15 @@ class NodeExtensionCommon {
// TODO This is where cluster state changes are handled
}
- void printNodeInfo(ILogger log, String addToProductName) {
- log.info(imdgVersionMessage());
+ void printNodeInfo(ILogger log) {
+ log.info(imgVersionMessage());
log.info(clusterNameMessage());
log.fine(serializationVersionMessage());
log.info('\n' + Constants.ST_LOGO);
log.info(Constants.COPYRIGHT_LINE);
}
- private String imdgVersionMessage() {
+ private String imgVersionMessage() {
String build = node.getBuildInfo().getBuild();
String revision = node.getBuildInfo().getRevision();
if (!revision.isEmpty()) {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 4e2c3e111..604ba203c 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
@@ -74,7 +75,8 @@ public class SeaTunnelSourceSupport
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(),
1);
Integer checkpointInterval =
- options.getInt(Constants.CHECKPOINT_INTERVAL,
CHECKPOINT_INTERVAL_DEFAULT);
+ options.getInt(
+ EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath =
StringUtils.replacePattern(checkpointLocation, "sources/\\d+",
"sources-state");
Configuration configuration =
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
index 4d9a9b17f..c4d03d2bd 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.translation.spark.source.partition.micro;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -74,7 +75,7 @@ public class SeaTunnelMicroBatch implements MicroBatchStream {
public InputPartition[] planInputPartitions(Offset start, Offset end) {
int checkpointInterval =
caseInsensitiveStringMap.getInt(
- Constants.CHECKPOINT_INTERVAL,
CHECKPOINT_INTERVAL_DEFAULT);
+ EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
CHECKPOINT_INTERVAL_DEFAULT);
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
String hdfsRoot =