This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cf0abb9b4 [Improve][Core] Unified the checkpoint setting key of Flink 
(#4296)
cf0abb9b4 is described below

commit cf0abb9b4fa9f9d0ab8d667e6a22f04d534ad7c2
Author: Hisoka <[email protected]>
AuthorDate: Tue Mar 14 10:51:27 2023 +0800

    [Improve][Core] Unified the checkpoint setting key of Flink (#4296)
    
    * [Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field
    
    * [Core] [Improve] Unify the checkpoint setting key of Flink
    
    * Revert "[Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms 
field"
    
    This reverts commit cd95635ea482a2f0f785990f35f2f8cc47c96365.
    
    * [Core] [Improve] Unify the checkpoint setting key of Flink
    
    * [Core] [Improve] Unify the checkpoint setting key of Flink
---
 .../src/main/java/org/apache/seatunnel/common/Constants.java |  4 ----
 .../starter/flink/execution/FlinkRuntimeEnvironment.java     | 12 ++++++++++--
 .../seatunnel/core/starter/flink/utils/ConfigKeyName.java    |  3 ++-
 .../starter/flink/execution/FlinkRuntimeEnvironment.java     | 12 ++++++++++--
 .../seatunnel/core/starter/flink/utils/ConfigKeyName.java    |  2 +-
 .../org/apache/seatunnel/engine/server/NodeExtension.java    |  2 +-
 .../apache/seatunnel/engine/server/NodeExtensionCommon.java  |  6 +++---
 .../translation/spark/source/SeaTunnelSourceSupport.java     |  4 +++-
 .../spark/source/partition/micro/SeaTunnelMicroBatch.java    |  3 ++-
 9 files changed, 32 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index c5e04d5fa..602143e10 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.common;
 
 public final class Constants {
-    public static final String ROW_ROOT = "__root__";
-    public static final String ROW_TMP = "__tmp__";
 
     public static final String LOGO = "SeaTunnel";
 
@@ -39,8 +37,6 @@ public final class Constants {
 
     public static final String HDFS_USER = "hdfs.user";
 
-    public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";
-
     public static final String CHECKPOINT_ID = "checkpoint.id";
 
     public static final String UUID = "uuid";
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index e180ebb74..7fb75064a 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -235,9 +236,16 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
     }
 
     private void setCheckpoint() {
-        if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+        long interval = 0;
+        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+        }
+
+        if (interval > 0) {
             CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
             environment.enableCheckpointing(interval);
 
             if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 78d444a57..699d7e379 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -27,7 +27,8 @@ public class ConfigKeyName {
     public static final String BUFFER_TIMEOUT_MILLIS = 
"execution.buffer.timeout";
     public static final String PARALLELISM = "execution.parallelism";
     public static final String MAX_PARALLELISM = "execution.max-parallelism";
-    public static final String CHECKPOINT_INTERVAL = 
"execution.checkpoint.interval";
+
+    @Deprecated public static final String CHECKPOINT_INTERVAL = 
"execution.checkpoint.interval";
     public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
     public static final String CHECKPOINT_TIMEOUT = 
"execution.checkpoint.timeout";
     public static final String CHECKPOINT_DATA_URI = 
"execution.checkpoint.data-uri";
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 951312b74..4b5bef07c 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
@@ -235,9 +236,16 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
     }
 
     private void setCheckpoint() {
-        if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+        long interval = 0;
+        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+        }
+
+        if (interval > 0) {
             CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
             environment.enableCheckpointing(interval);
 
             if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index 78d444a57..cc8229f26 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -27,7 +27,7 @@ public class ConfigKeyName {
     public static final String BUFFER_TIMEOUT_MILLIS = 
"execution.buffer.timeout";
     public static final String PARALLELISM = "execution.parallelism";
     public static final String MAX_PARALLELISM = "execution.max-parallelism";
-    public static final String CHECKPOINT_INTERVAL = 
"execution.checkpoint.interval";
+    @Deprecated public static final String CHECKPOINT_INTERVAL = 
"execution.checkpoint.interval";
     public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
     public static final String CHECKPOINT_TIMEOUT = 
"execution.checkpoint.timeout";
     public static final String CHECKPOINT_DATA_URI = 
"execution.checkpoint.data-uri";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 2c62314e2..d4137955c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -85,6 +85,6 @@ public class NodeExtension extends DefaultNodeExtension {
 
     @Override
     public void printNodeInfo() {
-        extCommon.printNodeInfo(systemLogger, "");
+        extCommon.printNodeInfo(systemLogger);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
index 8cf0a3470..8551f9381 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
@@ -58,15 +58,15 @@ class NodeExtensionCommon {
         // TODO This is where cluster state changes are handled
     }
 
-    void printNodeInfo(ILogger log, String addToProductName) {
-        log.info(imdgVersionMessage());
+    void printNodeInfo(ILogger log) {
+        log.info(imgVersionMessage());
         log.info(clusterNameMessage());
         log.fine(serializationVersionMessage());
         log.info('\n' + Constants.ST_LOGO);
         log.info(Constants.COPYRIGHT_LINE);
     }
 
-    private String imdgVersionMessage() {
+    private String imgVersionMessage() {
         String build = node.getBuildInfo().getBuild();
         String revision = node.getBuildInfo().getRevision();
         if (!revision.isEmpty()) {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 4e2c3e111..604ba203c 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.source;
 
 import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
@@ -74,7 +75,8 @@ public class SeaTunnelSourceSupport
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = 
getSeaTunnelSource(options);
         Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 
1);
         Integer checkpointInterval =
-                options.getInt(Constants.CHECKPOINT_INTERVAL, 
CHECKPOINT_INTERVAL_DEFAULT);
+                options.getInt(
+                        EnvCommonOptions.CHECKPOINT_INTERVAL.key(), 
CHECKPOINT_INTERVAL_DEFAULT);
         String checkpointPath =
                 StringUtils.replacePattern(checkpointLocation, "sources/\\d+", 
"sources-state");
         Configuration configuration =
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
index 4d9a9b17f..c4d03d2bd 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.translation.spark.source.partition.micro;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -74,7 +75,7 @@ public class SeaTunnelMicroBatch implements MicroBatchStream {
     public InputPartition[] planInputPartitions(Offset start, Offset end) {
         int checkpointInterval =
                 caseInsensitiveStringMap.getInt(
-                        Constants.CHECKPOINT_INTERVAL, 
CHECKPOINT_INTERVAL_DEFAULT);
+                        EnvCommonOptions.CHECKPOINT_INTERVAL.key(), 
CHECKPOINT_INTERVAL_DEFAULT);
         Configuration configuration =
                 
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
         String hdfsRoot =

Reply via email to