This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5d58b9610 [Feature] Simplify checkpoint/state related configuration 
(#1755)
5d58b9610 is described below

commit 5d58b961086147ab91f734a0545f3e36f30f3847
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Oct 7 18:18:45 2022 +0800

    [Feature] Simplify checkpoint/state related configuration (#1755)
---
 .../streampark/common/conf/ConfigConst.scala       |  35 -------
 .../apache/streampark/common/util/FlinkUtils.scala |  11 ++
 .../core/service/impl/ApplicationServiceImpl.java  |  13 +--
 .../flink/connector/redis/sink/RedisSink.scala     |  10 +-
 .../flink/core/FlinkStreamingInitializer.scala     |  98 ------------------
 .../flink/core/FlinkStreamingInitializer.scala     | 115 ---------------------
 .../flink/core/FlinkStreamingInitializer.scala     | 114 --------------------
 .../flink/core/FlinkStreamingInitializer.scala     | 113 --------------------
 8 files changed, 23 insertions(+), 486 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 32e4e17bb..89bf81cd0 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -114,41 +114,6 @@ object ConfigConst {
 
   val KEY_YARN_APP_QUEUE = "yarn.application.queue"
 
-  // --checkpoints--
-  val KEY_FLINK_CHECKPOINTS_ENABLE = "flink.checkpoints.enable"
-
-  val KEY_FLINK_CHECKPOINTS_UNALIGNED = "flink.checkpoints.unaligned"
-
-  val KEY_FLINK_CHECKPOINTS_INTERVAL = "flink.checkpoints.interval"
-
-  val KEY_FLINK_CHECKPOINTS_MODE = "flink.checkpoints.mode"
-
-  val KEY_FLINK_CHECKPOINTS_CLEANUP = "flink.checkpoints.cleanup"
-
-  val KEY_FLINK_CHECKPOINTS_TIMEOUT = "flink.checkpoints.timeout"
-
-  val KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT = "flink.checkpoints.maxConcurrent"
-
-  val KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN = 
"flink.checkpoints.minPauseBetween"
-
-  //---state---
-
-  val KEY_FLINK_STATE_SAVEPOINTS_DIR = "flink.state.savepoints.dir"
-
-  val KEY_FLINK_STATE_CHECKPOINTS_DIR = "flink.state.checkpoints.dir"
-
-  val KEY_FLINK_STATE_CHECKPOINT_STORAGE = "flink.state.checkpoint-storage"
-
-  val KEY_FLINK_STATE_BACKEND = "flink.state.backend.value"
-
-  val KEY_FLINK_STATE_BACKEND_ASYNC = "flink.state.backend.async"
-
-  val KEY_FLINK_STATE_BACKEND_INCREMENTAL = "flink.state.backend.incremental"
-
-  val KEY_FLINK_STATE_BACKEND_MEMORY = "flink.state.backend.memory"
-
-  val KEY_FLINK_STATE_ROCKSDB = "flink.state.backend.rocksdb"
-
   val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
 
   // ---watermark---
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
index f4b9ac6f5..28fa04063 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
@@ -19,7 +19,12 @@ package org.apache.streampark.common.util
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.runtime.state.FunctionInitializationContext
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
+import org.apache.flink.util.TimeUtils
+
 import java.io.File
+import java.time.Duration
+import java.util
 
 object FlinkUtils {
 
@@ -35,4 +40,10 @@ object FlinkUtils {
     }
   }
 
+  def isCheckpointEnabled(map: util.Map[String, String]): Boolean = {
+    val checkpointInterval: Duration = TimeUtils.parseDuration(
+      
map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key, 
"0ms"))
+    checkpointInterval.toMillis > 0
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c0a39f8ee..ed94fedbe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -33,6 +33,7 @@ import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
@@ -103,6 +104,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -1478,7 +1480,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         // 1) dynamic parameters have the highest priority, read the dynamic 
parameters are set: -Dstate.savepoints.dir
         String savepointPath = FlinkSubmitter
             .extractDynamicOptionAsJava(application.getDynamicOptions())
-            .get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
 
         // Application conf configuration has the second priority. If it is a 
streampark|flinksql type task,
         // see if Application conf is configured when the task is defined, if 
checkpoints are configured and enabled,
@@ -1488,9 +1490,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
                 if (applicationConfig != null) {
                     Map<String, String> map = applicationConfig.readConfig();
-                    boolean checkpointEnable = 
Boolean.parseBoolean(map.get(ConfigConst.KEY_FLINK_CHECKPOINTS_ENABLE()));
-                    if (checkpointEnable) {
-                        savepointPath = 
map.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR());
+                    if (FlinkUtils.isCheckpointEnabled(map)) {
+                        savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
                     }
                 }
             }
@@ -1506,12 +1507,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                         "the cluster has been deleted. Please contact the 
Admin.", application.getFlinkClusterId()));
                 Map<String, String> config = cluster.getFlinkConfig();
                 if (!config.isEmpty()) {
-                    savepointPath = 
config.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+                    savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
                 }
             } else {
                 // 3.2) At the yarn or k8s mode, then read the savepoint in 
flink-conf.yml in the bound flink
                 FlinkEnv flinkEnv = 
flinkEnvService.getById(application.getVersionId());
-                savepointPath = 
flinkEnv.convertFlinkYamlAsMap().get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+                savepointPath = 
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
             }
         }
 
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
index f50054327..55ec10761 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
@@ -17,8 +17,7 @@
 
 package org.apache.streampark.flink.connector.redis.sink
 
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.{FlinkUtils, Utils}
 import org.apache.streampark.flink.connector.redis.bean.RedisMapper
 import org.apache.streampark.flink.connector.redis.conf.RedisConfig
 import 
org.apache.streampark.flink.connector.redis.internal.{Redis2PCSinkFunction, 
RedisSinkFunction}
@@ -26,6 +25,7 @@ import org.apache.streampark.flink.connector.sink.Sink
 import org.apache.streampark.flink.core.scala.StreamingContext
 import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.streaming.api.datastream.{DataStreamSink, DataStream 
=> JavaDataStream}
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
 import org.apache.flink.streaming.api.scala.DataStream
 import 
org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase,
 FlinkJedisPoolConfig, FlinkJedisSentinelConfig}
 
@@ -60,11 +60,11 @@ class RedisSink(@(transient@param) ctx: StreamingContext,
   val prop = ctx.parameter.getProperties
   Utils.copyProperties(property, prop)
   private val redisConfig: RedisConfig = new RedisConfig(prop)
-  val enableCheckpoint: Boolean = 
allProperties.getOrElse(KEY_FLINK_CHECKPOINTS_ENABLE, "false").toBoolean
+  val enableCheckpoint: Boolean = FlinkUtils.isCheckpointEnabled(allProperties)
 
   val cpMode: CheckpointingMode = Try(
-    CheckpointingMode.valueOf(allProperties.get(KEY_FLINK_CHECKPOINTS_MODE))
-  ).getOrElse(CheckpointingMode.AT_LEAST_ONCE)
+    
CheckpointingMode.valueOf(allProperties.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()))
+  ).getOrElse(ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue())
 
 
   lazy val config: FlinkJedisConfigBase = {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 3acf60dba..305018f68 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -179,8 +179,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
 
-    checkpoint()
-
     apiType match {
       case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
       case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
@@ -189,100 +187,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     localStreamEnv.getConfig.setGlobalJobParameters(parameter)
   }
 
-  private[this] def checkpoint(): Unit = {
-    // read from the configuration file whether to enable checkpoint, default 
is disabled.
-    val enableCheckpoint = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
-    if (!enableCheckpoint) return
-
-    val cpInterval = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
-    val cpMode = 
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
-    val cpCleanUp = 
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
-    val cpTimeout = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
-    val cpMaxConcurrent = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
-    val cpMinPauseBetween = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
-    val unaligned = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
-    // default: enable checkpoint, interval 1s to start a checkpoint
-    streamEnvironment.enableCheckpointing(cpInterval)
-
-    val cpConfig = streamEnvironment.getCheckpointConfig
-
-    cpConfig.setCheckpointingMode(cpMode)
-    // default: min pause interval between checkpoints
-    cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
-    // default: checkpoints must complete within $cpTimeout minutes or be 
discarded
-    cpConfig.setCheckpointTimeout(cpTimeout)
-    // default: allow ? times checkpoint at the same time, default one.
-    cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
-    // default: checkpoint data is retained when cancelled
-    cpConfig.enableExternalizedCheckpoints(cpCleanUp)
-    // unaligned checkpoint (flink 1.11.1 +=)
-    cpConfig.enableUnalignedCheckpoints(unaligned)
-
-    val stateBackend = 
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
-    if (stateBackend != null) {
-      val cpDir = if (stateBackend == XStateBackend.jobmanager) null else {
-        /**
-         * 
cpDir如果从配置文件中读取失败(key:flink.state.checkpoints.dir),则尝试从flink-conf.yml中读取..
-         */
-        parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null) match {
-          // read from flink-conf.yaml
-          case null =>
-            logWarn("can't found flink.state.checkpoints.dir from 
properties,now try found from flink-conf.yaml")
-            // read `state.checkpoints.dir` key from flink-conf.yaml
-            val dir = defaultFlinkConf("state.checkpoints.dir")
-            require(dir != null, s"[StreamPark] can't found 
state.checkpoints.dir from default FlinkConf ")
-            logInfo(s"state.backend: state.checkpoints.dir found in 
flink-conf.yaml,$dir")
-            dir
-          case dir =>
-            logInfo(s"state.backend: flink.checkpoints.dir found in 
properties,$dir")
-            dir
-        }
-      }
-
-      stateBackend match {
-        /**
-         * The size of each individual state is by default limited to 5 MB. 
This value can be increased in the constructor of the MemoryStateBackend.
-         * Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size (see <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html";>Configuration</a>).
-         * The aggregate state must fit into the JobManager memory.
-         */
-        case XStateBackend.jobmanager =>
-          logInfo(s"state.backend Type: jobmanager...")
-          //default 5 MB,cannot be larger than the akka frame size
-          val maxMemorySize = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE)
-          val async = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_ASYNC).toBoolean).getOrElse(false)
-          val ms = new MemoryStateBackend(maxMemorySize, async)
-          streamEnvironment.setStateBackend(ms)
-        case XStateBackend.filesystem =>
-          logInfo(s"state.backend Type: filesystem...")
-          val async = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_ASYNC).toBoolean).getOrElse(false)
-          val fs = new FsStateBackend(cpDir, async)
-          streamEnvironment.setStateBackend(fs)
-        case XStateBackend.rocksdb =>
-          logInfo("state.backend Type: rocksdb...")
-          // default enable incremental checkpoint
-          val incremental = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_INCREMENTAL).toBoolean).getOrElse(true)
-          val rs = new RocksDBStateBackend(cpDir, incremental)
-          /**
-           * @see <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend"/>Flink
 Rocksdb Config</a>
-           */
-          val map = new JavaHashMap[String, Object]()
-          val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC, 
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY, 
KEY_FLINK_STATE_ROCKSDB)
-          
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
 => skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
-          if (map.nonEmpty) {
-            val optionsFactory = new DefaultConfigurableOptionsFactory
-            val config = new Configuration()
-            val confData = classOf[Configuration].getDeclaredField("confData")
-            confData.setAccessible(true)
-            confData.set(map, config)
-            optionsFactory.configure(config)
-            rs.setRocksDBOptions(optionsFactory)
-          }
-          streamEnvironment.setStateBackend(rs)
-        case _ =>
-          logError("usage error!!! state.backend must be 
(jobmanager|filesystem|rocksdb)")
-      }
-    }
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 46050334d..14aadd781 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -173,8 +173,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
 
-    checkpoint()
-
     apiType match {
       case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
       case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
@@ -183,117 +181,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     localStreamEnv.getConfig.setGlobalJobParameters(parameter)
   }
 
-  private[this] def checkpoint(): Unit = {
-    // read from the configuration file whether to enable checkpoint, default 
is disabled.
-    val enableCheckpoint = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
-    if (!enableCheckpoint) return
-
-    val cpInterval = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
-    val cpMode = 
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
-    val cpCleanUp = 
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
-    val cpTimeout = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
-    val cpMaxConcurrent = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
-    val cpMinPauseBetween = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
-    val unaligned = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
-    // default: enable checkpoint, interval 1s to start a checkpoint
-    streamEnvironment.enableCheckpointing(cpInterval)
-
-    val cpConfig = streamEnvironment.getCheckpointConfig
-
-    cpConfig.setCheckpointingMode(cpMode)
-    // default: min pause interval between checkpoints
-    cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
-    // default: checkpoints must complete within $cpTimeout minutes or be 
discarded
-    cpConfig.setCheckpointTimeout(cpTimeout)
-    // default: allow ? times checkpoint at the same time, default one.
-    cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
-    // default: checkpoint data is retained when cancelled
-    cpConfig.enableExternalizedCheckpoints(cpCleanUp)
-    // unaligned checkpoint (flink 1.11.1 +=)
-    cpConfig.enableUnalignedCheckpoints(unaligned)
-
-    val stateBackend = 
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
-    if (stateBackend != null) {
-      require(
-        stateBackend == XStateBackend.hashmap || stateBackend == 
XStateBackend.rocksdb,
-        "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
-      )
-      val storage = {
-        val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null) 
match {
-          // read from flink-conf.yaml
-          case null =>
-            logWarn("can't found flink.state.checkpoint-storage from 
properties,now try found from flink-conf.yaml")
-            val storage = defaultFlinkConf("state.checkpoint-storage")
-            require(storage != null, s"[StreamPark] can't found 
state.checkpoint-storage from default FlinkConf ")
-            logInfo(s"state.checkpoint-storage: state.checkpoint-storage found 
in flink-conf.yaml,$storage")
-            storage
-          case storage =>
-            logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in 
properties,$storage")
-            storage
-        }
-
-        Try(CheckpointStorage.withName(storage)) match {
-          case Success(value) => value
-          case Failure(e) => throw new IllegalArgumentException(e)
-        }
-      }
-
-      lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null) 
match {
-        // read from flink-conf.yaml
-        case null =>
-          logWarn("can't found flink.state.checkpoints.dir from properties,now 
try found from flink-conf.yaml")
-          // read `state.checkpoints.dir` key from flink-conf.yaml
-          val dir = defaultFlinkConf("state.checkpoints.dir")
-          require(dir != null, s"[StreamPark] can't found 
state.checkpoints.dir from Default FlinkConf ")
-          logInfo(s"state.backend: state.checkpoints.dir found in 
flink-conf.yaml,$dir")
-          dir
-        case dir =>
-          logInfo(s"state.backend: flink.checkpoints.dir found in 
properties,$dir")
-          dir
-      }
-
-      stateBackend match {
-        case XStateBackend.hashmap =>
-          logInfo("state.backend: hashmap...")
-          streamEnvironment.setStateBackend(new HashMapStateBackend())
-          storage match {
-            case CheckpointStorage.jobmanager =>
-              logInfo("state.checkpoint-storage: jobmanager...")
-              val maxMemorySize = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
-              val jobManagerCheckpointStorage = new 
JobManagerCheckpointStorage(maxMemorySize)
-              cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-          }
-        case XStateBackend.rocksdb =>
-          logInfo("state.backend: rocksdb...")
-          val rock = new EmbeddedRocksDBStateBackend()
-          val map = new JavaHashMap[String, Object]()
-          val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC, 
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY, 
KEY_FLINK_STATE_ROCKSDB)
-          
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
 => skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
-          if (map.nonEmpty) {
-            val optionsFactory = new DefaultConfigurableOptionsFactory
-            val config = new Configuration()
-            val confData = classOf[Configuration].getDeclaredField("confData")
-            confData.setAccessible(true)
-            confData.set(map, config)
-            optionsFactory.configure(config)
-            rock.setRocksDBOptions(optionsFactory)
-          }
-          streamEnvironment.setStateBackend(rock)
-          storage match {
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-            case _ =>
-              throw new IllegalArgumentException("[StreamPark] state.backend 
is  rocksdb, state.checkpoint-storage must be filesystem...")
-          }
-      }
-    }
-
-  }
-
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 6dce51624..8991f1d77 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -173,8 +173,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
 
-    checkpoint()
-
     apiType match {
       case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
       case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
@@ -183,116 +181,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     localStreamEnv.getConfig.setGlobalJobParameters(parameter)
   }
 
-  private[this] def checkpoint(): Unit = {
-    // read from the configuration file whether to enable checkpoint, default 
is disabled.
-    val enableCheckpoint = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
-    if (!enableCheckpoint) return
-
-    val cpInterval = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
-    val cpMode = 
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
-    val cpCleanUp = 
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
-    val cpTimeout = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
-    val cpMaxConcurrent = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
-    val cpMinPauseBetween = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
-    val unaligned = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
-    // default: enable checkpoint, interval 1s to start a checkpoint
-    streamEnvironment.enableCheckpointing(cpInterval)
-
-    val cpConfig = streamEnvironment.getCheckpointConfig
-
-    cpConfig.setCheckpointingMode(cpMode)
-    // default, min pause interval between checkpoints
-    cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
-    // default: checkpoints must complete within $cpTimeout minutes or be 
discarded
-    cpConfig.setCheckpointTimeout(cpTimeout)
-    // default: allow ? times checkpoint at the same time, default one.
-    cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
-    // default: checkpoint data is retained when cancelled
-    cpConfig.enableExternalizedCheckpoints(cpCleanUp)
-    // unaligned checkpoint (flink 1.11.1 +=)
-    cpConfig.enableUnalignedCheckpoints(unaligned)
-
-    val stateBackend = 
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
-    if (stateBackend != null) {
-      require(
-        stateBackend == XStateBackend.hashmap || stateBackend == 
XStateBackend.rocksdb,
-        "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
-      )
-      val storage = {
-        val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null) 
match {
-          // read from flink-conf.yaml
-          case null =>
-            logWarn("can't found flink.state.checkpoint-storage from 
properties,now try found from flink-conf.yaml")
-            val storage = defaultFlinkConf("state.checkpoint-storage")
-            require(storage != null, s"[StreamPark] can't found 
state.checkpoint-storage from default FlinkConf ")
-            logInfo(s"state.checkpoint-storage: state.checkpoint-storage found 
in flink-conf.yaml,$storage")
-            storage
-          case storage =>
-            logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in 
properties,$storage")
-            storage
-        }
-
-        Try(CheckpointStorage.withName(storage)) match {
-          case Success(value) => value
-          case Failure(e) => throw new IllegalArgumentException(e)
-        }
-      }
-
-      lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null) 
match {
-        // read from flink-conf.yaml
-        case null =>
-          logWarn("can't found flink.state.checkpoints.dir from properties,now 
try found from flink-conf.yaml")
-          // read `state.checkpoints.dir` key from flink-conf.yaml
-          val dir = defaultFlinkConf("state.checkpoints.dir")
-          require(dir != null, s"[StreamPark] can't found 
state.checkpoints.dir from Default FlinkConf ")
-          logInfo(s"state.backend: state.checkpoints.dir found in 
flink-conf.yaml,$dir")
-          dir
-        case dir =>
-          logInfo(s"state.backend: flink.checkpoints.dir found in 
properties,$dir")
-          dir
-      }
-
-      stateBackend match {
-        case XStateBackend.hashmap =>
-          logInfo("state.backend: hashmap...")
-          streamEnvironment.setStateBackend(new HashMapStateBackend())
-          storage match {
-            case CheckpointStorage.jobmanager =>
-              logInfo("state.checkpoint-storage: jobmanager...")
-              val maxMemorySize = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
-              val jobManagerCheckpointStorage = new 
JobManagerCheckpointStorage(maxMemorySize)
-              cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-          }
-        case XStateBackend.rocksdb =>
-          logInfo("state.backend: rocksdb...")
-          val rock = new EmbeddedRocksDBStateBackend()
-          val map = new JavaHashMap[String, Object]()
-          val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC, 
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY, 
KEY_FLINK_STATE_ROCKSDB)
-          
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
 => skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
-          if (map.nonEmpty) {
-            val optionsFactory = new DefaultConfigurableOptionsFactory
-            val config = new Configuration()
-            val confData = classOf[Configuration].getDeclaredField("confData")
-            confData.setAccessible(true)
-            confData.set(map, config)
-            optionsFactory.configure(config)
-            rock.setRocksDBOptions(optionsFactory)
-          }
-          streamEnvironment.setStateBackend(rock)
-          storage match {
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-            case _ =>
-              throw new IllegalArgumentException("[StreamPark] state.backend 
is  rocksdb, state.checkpoint-storage must be filesystem...")
-          }
-      }
-    }
-
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 86706267d..db8bed611 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -172,8 +172,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
 
-    checkpoint()
-
     apiType match {
       case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
       case ApiType.scala if streamEnvConfFunc != null => 
streamEnvConfFunc(localStreamEnv, parameter)
@@ -182,115 +180,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     localStreamEnv.getConfig.setGlobalJobParameters(parameter)
   }
 
-  private[this] def checkpoint(): Unit = {
-    // read from the configuration file whether to enable checkpoint, default 
is disabled.
-    val enableCheckpoint = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
-    if(!enableCheckpoint) return
-
-    val cpInterval = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
-    val cpMode = 
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
-    val cpCleanUp = 
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
-    val cpTimeout = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
-    val cpMaxConcurrent = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
-    val cpMinPauseBetween = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
-    val unaligned = 
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
-    // default: enable checkpoint, interval 1s to start a checkpoint
-    streamEnvironment.enableCheckpointing(cpInterval)
-
-    val cpConfig = streamEnvironment.getCheckpointConfig
-
-    cpConfig.setCheckpointingMode(cpMode)
-    // default: min pause interval between checkpoints
-    cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
-    // default: checkpoints must complete within $cpTimeout minutes or be 
discarded
-    cpConfig.setCheckpointTimeout(cpTimeout)
-    // default: allow ? times checkpoint at the same time, default one.
-    cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
-    // default: checkpoint data is retained when cancelled
-    cpConfig.enableExternalizedCheckpoints(cpCleanUp)
-    // unaligned checkpoint (flink 1.11.1 +=)
-    cpConfig.enableUnalignedCheckpoints(unaligned)
-
-    val stateBackend = 
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
-    if (stateBackend != null) {
-      require(
-        stateBackend == XStateBackend.hashmap || stateBackend == 
XStateBackend.rocksdb,
-        "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
-      )
-      val storage = {
-        val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null) 
match {
-          // read from flink-conf.yaml
-          case null =>
-            logWarn("can't found flink.state.checkpoint-storage from 
properties,now try found from flink-conf.yaml")
-            val storage = defaultFlinkConf("state.checkpoint-storage")
-            require(storage != null, s"[StreamPark] can't found 
state.checkpoint-storage from default FlinkConf ")
-            logInfo(s"state.checkpoint-storage: state.checkpoint-storage found 
in flink-conf.yaml,$storage")
-            storage
-          case storage =>
-            logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in 
properties,$storage")
-            storage
-        }
-
-        Try(CheckpointStorage.withName(storage)) match {
-          case Success(value) => value
-          case Failure(e) => throw new IllegalArgumentException(e)
-        }
-      }
-
-      lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null) 
match {
-        // read from flink-conf.yaml
-        case null =>
-          logWarn("can't found flink.state.checkpoints.dir from properties,now 
try found from flink-conf.yaml")
-          // read `state.checkpoints.dir` key from flink-conf.yaml
-          val dir = defaultFlinkConf("state.checkpoints.dir")
-          require(dir != null, s"[StreamPark] can't found 
state.checkpoints.dir from Default FlinkConf ")
-          logInfo(s"state.backend: state.checkpoints.dir found in 
flink-conf.yaml,$dir")
-          dir
-        case dir =>
-          logInfo(s"state.backend: flink.checkpoints.dir found in 
properties,$dir")
-          dir
-      }
-
-      stateBackend match {
-        case XStateBackend.hashmap =>
-          logInfo("state.backend: hashmap...")
-          streamEnvironment.setStateBackend(new HashMapStateBackend())
-          storage match {
-            case CheckpointStorage.jobmanager =>
-              logInfo("state.checkpoint-storage: jobmanager...")
-              val maxMemorySize = 
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
-              val jobManagerCheckpointStorage = new 
JobManagerCheckpointStorage(maxMemorySize)
-              cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-          }
-        case XStateBackend.rocksdb =>
-          logInfo("state.backend: rocksdb...")
-          val rock = new EmbeddedRocksDBStateBackend()
-          val map = new JavaHashMap[String, Object]()
-          val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC, 
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY, 
KEY_FLINK_STATE_ROCKSDB)
-          
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
 => skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
-          if (map.nonEmpty) {
-            val optionsFactory = new DefaultConfigurableOptionsFactory
-            val config = new Configuration()
-            val confData = classOf[Configuration].getDeclaredField("confData")
-            confData.setAccessible(true)
-            confData.set(map, config)
-            optionsFactory.configure(config)
-            rock.setRocksDBOptions(optionsFactory)
-          }
-          streamEnvironment.setStateBackend(rock)
-          storage match {
-            case CheckpointStorage.filesystem =>
-              logInfo("state.checkpoint-storage: filesystem...")
-              cpConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(cpDir))
-            case _ =>
-              throw new IllegalArgumentException("[StreamPark] state.backend 
is  rocksdb, state.checkpoint-storage must be filesystem...")
-          }
-      }
-    }
-  }
-
 }


Reply via email to