This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 5d58b9610 [Feature] Simplify checkpoint/state related configuration
(#1755)
5d58b9610 is described below
commit 5d58b961086147ab91f734a0545f3e36f30f3847
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Oct 7 18:18:45 2022 +0800
[Feature] Simplify checkpoint/state related configuration (#1755)
---
.../streampark/common/conf/ConfigConst.scala | 35 -------
.../apache/streampark/common/util/FlinkUtils.scala | 11 ++
.../core/service/impl/ApplicationServiceImpl.java | 13 +--
.../flink/connector/redis/sink/RedisSink.scala | 10 +-
.../flink/core/FlinkStreamingInitializer.scala | 98 ------------------
.../flink/core/FlinkStreamingInitializer.scala | 115 ---------------------
.../flink/core/FlinkStreamingInitializer.scala | 114 --------------------
.../flink/core/FlinkStreamingInitializer.scala | 113 --------------------
8 files changed, 23 insertions(+), 486 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 32e4e17bb..89bf81cd0 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -114,41 +114,6 @@ object ConfigConst {
val KEY_YARN_APP_QUEUE = "yarn.application.queue"
- // --checkpoints--
- val KEY_FLINK_CHECKPOINTS_ENABLE = "flink.checkpoints.enable"
-
- val KEY_FLINK_CHECKPOINTS_UNALIGNED = "flink.checkpoints.unaligned"
-
- val KEY_FLINK_CHECKPOINTS_INTERVAL = "flink.checkpoints.interval"
-
- val KEY_FLINK_CHECKPOINTS_MODE = "flink.checkpoints.mode"
-
- val KEY_FLINK_CHECKPOINTS_CLEANUP = "flink.checkpoints.cleanup"
-
- val KEY_FLINK_CHECKPOINTS_TIMEOUT = "flink.checkpoints.timeout"
-
- val KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT = "flink.checkpoints.maxConcurrent"
-
- val KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN =
"flink.checkpoints.minPauseBetween"
-
- //---state---
-
- val KEY_FLINK_STATE_SAVEPOINTS_DIR = "flink.state.savepoints.dir"
-
- val KEY_FLINK_STATE_CHECKPOINTS_DIR = "flink.state.checkpoints.dir"
-
- val KEY_FLINK_STATE_CHECKPOINT_STORAGE = "flink.state.checkpoint-storage"
-
- val KEY_FLINK_STATE_BACKEND = "flink.state.backend.value"
-
- val KEY_FLINK_STATE_BACKEND_ASYNC = "flink.state.backend.async"
-
- val KEY_FLINK_STATE_BACKEND_INCREMENTAL = "flink.state.backend.incremental"
-
- val KEY_FLINK_STATE_BACKEND_MEMORY = "flink.state.backend.memory"
-
- val KEY_FLINK_STATE_ROCKSDB = "flink.state.backend.rocksdb"
-
val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
// ---watermark---
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
index f4b9ac6f5..28fa04063 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkUtils.scala
@@ -19,7 +19,12 @@ package org.apache.streampark.common.util
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.FunctionInitializationContext
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
+import org.apache.flink.util.TimeUtils
+
import java.io.File
+import java.time.Duration
+import java.util
object FlinkUtils {
@@ -35,4 +40,10 @@ object FlinkUtils {
}
}
+ def isCheckpointEnabled(map: util.Map[String, String]): Boolean = {
+ val checkpointInterval: Duration = TimeUtils.parseDuration(
+
map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key,
"0ms"))
+ checkpointInterval.toMillis > 0
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c0a39f8ee..ed94fedbe 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -33,6 +33,7 @@ import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.FlinkUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
@@ -103,6 +104,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.springframework.beans.factory.annotation.Autowired;
@@ -1478,7 +1480,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// 1) dynamic parameters have the highest priority, read the dynamic
parameters are set: -Dstate.savepoints.dir
String savepointPath = FlinkSubmitter
.extractDynamicOptionAsJava(application.getDynamicOptions())
- .get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+ .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
// Application conf configuration has the second priority. If it is a
streampark|flinksql type task,
// see if Application conf is configured when the task is defined, if
checkpoints are configured and enabled,
@@ -1488,9 +1490,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
ApplicationConfig applicationConfig =
configService.getEffective(application.getId());
if (applicationConfig != null) {
Map<String, String> map = applicationConfig.readConfig();
- boolean checkpointEnable =
Boolean.parseBoolean(map.get(ConfigConst.KEY_FLINK_CHECKPOINTS_ENABLE()));
- if (checkpointEnable) {
- savepointPath =
map.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR());
+ if (FlinkUtils.isCheckpointEnabled(map)) {
+ savepointPath =
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
}
}
@@ -1506,12 +1507,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"the cluster has been deleted. Please contact the
Admin.", application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
- savepointPath =
config.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+ savepointPath =
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
} else {
// 3.2) At the yarn or k8s mode, then read the savepoint in
flink-conf.yml in the bound flink
FlinkEnv flinkEnv =
flinkEnvService.getById(application.getVersionId());
- savepointPath =
flinkEnv.convertFlinkYamlAsMap().get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
+ savepointPath =
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
index f50054327..55ec10761 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
@@ -17,8 +17,7 @@
package org.apache.streampark.flink.connector.redis.sink
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.{FlinkUtils, Utils}
import org.apache.streampark.flink.connector.redis.bean.RedisMapper
import org.apache.streampark.flink.connector.redis.conf.RedisConfig
import
org.apache.streampark.flink.connector.redis.internal.{Redis2PCSinkFunction,
RedisSinkFunction}
@@ -26,6 +25,7 @@ import org.apache.streampark.flink.connector.sink.Sink
import org.apache.streampark.flink.core.scala.StreamingContext
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStreamSink, DataStream
=> JavaDataStream}
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.scala.DataStream
import
org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase,
FlinkJedisPoolConfig, FlinkJedisSentinelConfig}
@@ -60,11 +60,11 @@ class RedisSink(@(transient@param) ctx: StreamingContext,
val prop = ctx.parameter.getProperties
Utils.copyProperties(property, prop)
private val redisConfig: RedisConfig = new RedisConfig(prop)
- val enableCheckpoint: Boolean =
allProperties.getOrElse(KEY_FLINK_CHECKPOINTS_ENABLE, "false").toBoolean
+ val enableCheckpoint: Boolean = FlinkUtils.isCheckpointEnabled(allProperties)
val cpMode: CheckpointingMode = Try(
- CheckpointingMode.valueOf(allProperties.get(KEY_FLINK_CHECKPOINTS_MODE))
- ).getOrElse(CheckpointingMode.AT_LEAST_ONCE)
+
CheckpointingMode.valueOf(allProperties.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()))
+ ).getOrElse(ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue())
lazy val config: FlinkJedisConfigBase = {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 3acf60dba..305018f68 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -179,8 +179,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
- checkpoint()
-
apiType match {
case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
@@ -189,100 +187,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
localStreamEnv.getConfig.setGlobalJobParameters(parameter)
}
- private[this] def checkpoint(): Unit = {
- // read from the configuration file whether to enable checkpoint, default
is disabled.
- val enableCheckpoint =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
- if (!enableCheckpoint) return
-
- val cpInterval =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
- val cpMode =
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
- val cpCleanUp =
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- val cpTimeout =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
- val cpMaxConcurrent =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
- val cpMinPauseBetween =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
- val unaligned =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
- // default: enable checkpoint, interval 1s to start a checkpoint
- streamEnvironment.enableCheckpointing(cpInterval)
-
- val cpConfig = streamEnvironment.getCheckpointConfig
-
- cpConfig.setCheckpointingMode(cpMode)
- // default: min pause interval between checkpoints
- cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
- // default: checkpoints must complete within $cpTimeout minutes or be
discarded
- cpConfig.setCheckpointTimeout(cpTimeout)
- // default: allow ? times checkpoint at the same time, default one.
- cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
- // default: checkpoint data is retained when cancelled
- cpConfig.enableExternalizedCheckpoints(cpCleanUp)
- // unaligned checkpoint (flink 1.11.1 +=)
- cpConfig.enableUnalignedCheckpoints(unaligned)
-
- val stateBackend =
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
- if (stateBackend != null) {
- val cpDir = if (stateBackend == XStateBackend.jobmanager) null else {
- /**
- *
cpDir如果从配置文件中读取失败(key:flink.state.checkpoints.dir),则尝试从flink-conf.yml中读取..
- */
- parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null) match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoints.dir from
properties,now try found from flink-conf.yaml")
- // read `state.checkpoints.dir` key from flink-conf.yaml
- val dir = defaultFlinkConf("state.checkpoints.dir")
- require(dir != null, s"[StreamPark] can't found
state.checkpoints.dir from default FlinkConf ")
- logInfo(s"state.backend: state.checkpoints.dir found in
flink-conf.yaml,$dir")
- dir
- case dir =>
- logInfo(s"state.backend: flink.checkpoints.dir found in
properties,$dir")
- dir
- }
- }
-
- stateBackend match {
- /**
- * The size of each individual state is by default limited to 5 MB.
This value can be increased in the constructor of the MemoryStateBackend.
- * Irrespective of the configured maximal state size, the state cannot
be larger than the akka frame size (see <a
href="https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html">Configuration</a>).
- * The aggregate state must fit into the JobManager memory.
- */
- case XStateBackend.jobmanager =>
- logInfo(s"state.backend Type: jobmanager...")
- //default 5 MB,cannot be larger than the akka frame size
- val maxMemorySize =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE)
- val async =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_ASYNC).toBoolean).getOrElse(false)
- val ms = new MemoryStateBackend(maxMemorySize, async)
- streamEnvironment.setStateBackend(ms)
- case XStateBackend.filesystem =>
- logInfo(s"state.backend Type: filesystem...")
- val async =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_ASYNC).toBoolean).getOrElse(false)
- val fs = new FsStateBackend(cpDir, async)
- streamEnvironment.setStateBackend(fs)
- case XStateBackend.rocksdb =>
- logInfo("state.backend Type: rocksdb...")
- // default enable incremental checkpoint
- val incremental =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_INCREMENTAL).toBoolean).getOrElse(true)
- val rs = new RocksDBStateBackend(cpDir, incremental)
- /**
- * @see <a
href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend"/>Flink
Rocksdb Config</a>
- */
- val map = new JavaHashMap[String, Object]()
- val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC,
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY,
KEY_FLINK_STATE_ROCKSDB)
-
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
=> skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
- if (map.nonEmpty) {
- val optionsFactory = new DefaultConfigurableOptionsFactory
- val config = new Configuration()
- val confData = classOf[Configuration].getDeclaredField("confData")
- confData.setAccessible(true)
- confData.set(map, config)
- optionsFactory.configure(config)
- rs.setRocksDBOptions(optionsFactory)
- }
- streamEnvironment.setStateBackend(rs)
- case _ =>
- logError("usage error!!! state.backend must be
(jobmanager|filesystem|rocksdb)")
- }
- }
- }
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 46050334d..14aadd781 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -173,8 +173,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
- checkpoint()
-
apiType match {
case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
@@ -183,117 +181,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
localStreamEnv.getConfig.setGlobalJobParameters(parameter)
}
- private[this] def checkpoint(): Unit = {
- // read from the configuration file whether to enable checkpoint, default
is disabled.
- val enableCheckpoint =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
- if (!enableCheckpoint) return
-
- val cpInterval =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
- val cpMode =
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
- val cpCleanUp =
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- val cpTimeout =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
- val cpMaxConcurrent =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
- val cpMinPauseBetween =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
- val unaligned =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
- // default: enable checkpoint, interval 1s to start a checkpoint
- streamEnvironment.enableCheckpointing(cpInterval)
-
- val cpConfig = streamEnvironment.getCheckpointConfig
-
- cpConfig.setCheckpointingMode(cpMode)
- // default: min pause interval between checkpoints
- cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
- // default: checkpoints must complete within $cpTimeout minutes or be
discarded
- cpConfig.setCheckpointTimeout(cpTimeout)
- // default: allow ? times checkpoint at the same time, default one.
- cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
- // default: checkpoint data is retained when cancelled
- cpConfig.enableExternalizedCheckpoints(cpCleanUp)
- // unaligned checkpoint (flink 1.11.1 +=)
- cpConfig.enableUnalignedCheckpoints(unaligned)
-
- val stateBackend =
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
- if (stateBackend != null) {
- require(
- stateBackend == XStateBackend.hashmap || stateBackend ==
XStateBackend.rocksdb,
- "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
- )
- val storage = {
- val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoint-storage from
properties,now try found from flink-conf.yaml")
- val storage = defaultFlinkConf("state.checkpoint-storage")
- require(storage != null, s"[StreamPark] can't found
state.checkpoint-storage from default FlinkConf ")
- logInfo(s"state.checkpoint-storage: state.checkpoint-storage found
in flink-conf.yaml,$storage")
- storage
- case storage =>
- logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in
properties,$storage")
- storage
- }
-
- Try(CheckpointStorage.withName(storage)) match {
- case Success(value) => value
- case Failure(e) => throw new IllegalArgumentException(e)
- }
- }
-
- lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoints.dir from properties,now
try found from flink-conf.yaml")
- // read `state.checkpoints.dir` key from flink-conf.yaml
- val dir = defaultFlinkConf("state.checkpoints.dir")
- require(dir != null, s"[StreamPark] can't found
state.checkpoints.dir from Default FlinkConf ")
- logInfo(s"state.backend: state.checkpoints.dir found in
flink-conf.yaml,$dir")
- dir
- case dir =>
- logInfo(s"state.backend: flink.checkpoints.dir found in
properties,$dir")
- dir
- }
-
- stateBackend match {
- case XStateBackend.hashmap =>
- logInfo("state.backend: hashmap...")
- streamEnvironment.setStateBackend(new HashMapStateBackend())
- storage match {
- case CheckpointStorage.jobmanager =>
- logInfo("state.checkpoint-storage: jobmanager...")
- val maxMemorySize =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
- val jobManagerCheckpointStorage = new
JobManagerCheckpointStorage(maxMemorySize)
- cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- }
- case XStateBackend.rocksdb =>
- logInfo("state.backend: rocksdb...")
- val rock = new EmbeddedRocksDBStateBackend()
- val map = new JavaHashMap[String, Object]()
- val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC,
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY,
KEY_FLINK_STATE_ROCKSDB)
-
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
=> skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
- if (map.nonEmpty) {
- val optionsFactory = new DefaultConfigurableOptionsFactory
- val config = new Configuration()
- val confData = classOf[Configuration].getDeclaredField("confData")
- confData.setAccessible(true)
- confData.set(map, config)
- optionsFactory.configure(config)
- rock.setRocksDBOptions(optionsFactory)
- }
- streamEnvironment.setStateBackend(rock)
- storage match {
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- case _ =>
- throw new IllegalArgumentException("[StreamPark] state.backend
is rocksdb, state.checkpoint-storage must be filesystem...")
- }
- }
- }
-
- }
-
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 6dce51624..8991f1d77 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -173,8 +173,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
- checkpoint()
-
apiType match {
case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
@@ -183,116 +181,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
localStreamEnv.getConfig.setGlobalJobParameters(parameter)
}
- private[this] def checkpoint(): Unit = {
- // read from the configuration file whether to enable checkpoint, default
is disabled.
- val enableCheckpoint =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
- if (!enableCheckpoint) return
-
- val cpInterval =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
- val cpMode =
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
- val cpCleanUp =
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- val cpTimeout =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
- val cpMaxConcurrent =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
- val cpMinPauseBetween =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
- val unaligned =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
- // default: enable checkpoint, interval 1s to start a checkpoint
- streamEnvironment.enableCheckpointing(cpInterval)
-
- val cpConfig = streamEnvironment.getCheckpointConfig
-
- cpConfig.setCheckpointingMode(cpMode)
- // default, min pause interval between checkpoints
- cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
- // default: checkpoints must complete within $cpTimeout minutes or be
discarded
- cpConfig.setCheckpointTimeout(cpTimeout)
- // default: allow ? times checkpoint at the same time, default one.
- cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
- // default: checkpoint data is retained when cancelled
- cpConfig.enableExternalizedCheckpoints(cpCleanUp)
- // unaligned checkpoint (flink 1.11.1 +=)
- cpConfig.enableUnalignedCheckpoints(unaligned)
-
- val stateBackend =
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
- if (stateBackend != null) {
- require(
- stateBackend == XStateBackend.hashmap || stateBackend ==
XStateBackend.rocksdb,
- "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
- )
- val storage = {
- val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoint-storage from
properties,now try found from flink-conf.yaml")
- val storage = defaultFlinkConf("state.checkpoint-storage")
- require(storage != null, s"[StreamPark] can't found
state.checkpoint-storage from default FlinkConf ")
- logInfo(s"state.checkpoint-storage: state.checkpoint-storage found
in flink-conf.yaml,$storage")
- storage
- case storage =>
- logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in
properties,$storage")
- storage
- }
-
- Try(CheckpointStorage.withName(storage)) match {
- case Success(value) => value
- case Failure(e) => throw new IllegalArgumentException(e)
- }
- }
-
- lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoints.dir from properties,now
try found from flink-conf.yaml")
- // read `state.checkpoints.dir` key from flink-conf.yaml
- val dir = defaultFlinkConf("state.checkpoints.dir")
- require(dir != null, s"[StreamPark] can't found
state.checkpoints.dir from Default FlinkConf ")
- logInfo(s"state.backend: state.checkpoints.dir found in
flink-conf.yaml,$dir")
- dir
- case dir =>
- logInfo(s"state.backend: flink.checkpoints.dir found in
properties,$dir")
- dir
- }
-
- stateBackend match {
- case XStateBackend.hashmap =>
- logInfo("state.backend: hashmap...")
- streamEnvironment.setStateBackend(new HashMapStateBackend())
- storage match {
- case CheckpointStorage.jobmanager =>
- logInfo("state.checkpoint-storage: jobmanager...")
- val maxMemorySize =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
- val jobManagerCheckpointStorage = new
JobManagerCheckpointStorage(maxMemorySize)
- cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- }
- case XStateBackend.rocksdb =>
- logInfo("state.backend: rocksdb...")
- val rock = new EmbeddedRocksDBStateBackend()
- val map = new JavaHashMap[String, Object]()
- val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC,
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY,
KEY_FLINK_STATE_ROCKSDB)
-
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
=> skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
- if (map.nonEmpty) {
- val optionsFactory = new DefaultConfigurableOptionsFactory
- val config = new Configuration()
- val confData = classOf[Configuration].getDeclaredField("confData")
- confData.setAccessible(true)
- confData.set(map, config)
- optionsFactory.configure(config)
- rock.setRocksDBOptions(optionsFactory)
- }
- streamEnvironment.setStateBackend(rock)
- storage match {
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- case _ =>
- throw new IllegalArgumentException("[StreamPark] state.backend
is rocksdb, state.checkpoint-storage must be filesystem...")
- }
- }
- }
-
- }
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 86706267d..db8bed611 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -172,8 +172,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
- checkpoint()
-
apiType match {
case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
@@ -182,115 +180,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
localStreamEnv.getConfig.setGlobalJobParameters(parameter)
}
- private[this] def checkpoint(): Unit = {
- // read from the configuration file whether to enable checkpoint, default
is disabled.
- val enableCheckpoint =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_ENABLE).toBoolean).getOrElse(false)
- if(!enableCheckpoint) return
-
- val cpInterval =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_INTERVAL).toInt).getOrElse(1000)
- val cpMode =
Try(CheckpointingMode.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_MODE))).getOrElse(CheckpointingMode.EXACTLY_ONCE)
- val cpCleanUp =
Try(ExternalizedCheckpointCleanup.valueOf(parameter.get(KEY_FLINK_CHECKPOINTS_CLEANUP))).getOrElse(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- val cpTimeout =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_TIMEOUT).toLong).getOrElse(CheckpointConfig.DEFAULT_TIMEOUT)
- val cpMaxConcurrent =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MAX_CONCURRENT).toInt).getOrElse(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
- val cpMinPauseBetween =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_MIN_PAUSEBETWEEN).toLong).getOrElse(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
- val unaligned =
Try(parameter.get(KEY_FLINK_CHECKPOINTS_UNALIGNED).toBoolean).getOrElse(false)
-
- // default: enable checkpoint, interval 1s to start a checkpoint
- streamEnvironment.enableCheckpointing(cpInterval)
-
- val cpConfig = streamEnvironment.getCheckpointConfig
-
- cpConfig.setCheckpointingMode(cpMode)
- // default: min pause interval between checkpoints
- cpConfig.setMinPauseBetweenCheckpoints(cpMinPauseBetween)
- // default: checkpoints must complete within $cpTimeout minutes or be
discarded
- cpConfig.setCheckpointTimeout(cpTimeout)
- // default: allow ? times checkpoint at the same time, default one.
- cpConfig.setMaxConcurrentCheckpoints(cpMaxConcurrent)
- // default: checkpoint data is retained when cancelled
- cpConfig.enableExternalizedCheckpoints(cpCleanUp)
- // unaligned checkpoint (flink 1.11.1 +=)
- cpConfig.enableUnalignedCheckpoints(unaligned)
-
- val stateBackend =
XStateBackend.withName(parameter.get(KEY_FLINK_STATE_BACKEND, null))
- if (stateBackend != null) {
- require(
- stateBackend == XStateBackend.hashmap || stateBackend ==
XStateBackend.rocksdb,
- "state.backend must be [hashmap|rocksdb] in flink 1.13 and above"
- )
- val storage = {
- val storage = parameter.get(KEY_FLINK_STATE_CHECKPOINT_STORAGE, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoint-storage from
properties,now try found from flink-conf.yaml")
- val storage = defaultFlinkConf("state.checkpoint-storage")
- require(storage != null, s"[StreamPark] can't found
state.checkpoint-storage from default FlinkConf ")
- logInfo(s"state.checkpoint-storage: state.checkpoint-storage found
in flink-conf.yaml,$storage")
- storage
- case storage =>
- logInfo(s"state.checkpoint-storage: flink.checkpoints.dir found in
properties,$storage")
- storage
- }
-
- Try(CheckpointStorage.withName(storage)) match {
- case Success(value) => value
- case Failure(e) => throw new IllegalArgumentException(e)
- }
- }
-
- lazy val cpDir = parameter.get(KEY_FLINK_STATE_CHECKPOINTS_DIR, null)
match {
- // read from flink-conf.yaml
- case null =>
- logWarn("can't found flink.state.checkpoints.dir from properties,now
try found from flink-conf.yaml")
- // read `state.checkpoints.dir` key from flink-conf.yaml
- val dir = defaultFlinkConf("state.checkpoints.dir")
- require(dir != null, s"[StreamPark] can't found
state.checkpoints.dir from Default FlinkConf ")
- logInfo(s"state.backend: state.checkpoints.dir found in
flink-conf.yaml,$dir")
- dir
- case dir =>
- logInfo(s"state.backend: flink.checkpoints.dir found in
properties,$dir")
- dir
- }
-
- stateBackend match {
- case XStateBackend.hashmap =>
- logInfo("state.backend: hashmap...")
- streamEnvironment.setStateBackend(new HashMapStateBackend())
- storage match {
- case CheckpointStorage.jobmanager =>
- logInfo("state.checkpoint-storage: jobmanager...")
- val maxMemorySize =
Try(parameter.get(KEY_FLINK_STATE_BACKEND_MEMORY).toInt).getOrElse(JobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE)
- val jobManagerCheckpointStorage = new
JobManagerCheckpointStorage(maxMemorySize)
- cpConfig.setCheckpointStorage(jobManagerCheckpointStorage)
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- }
- case XStateBackend.rocksdb =>
- logInfo("state.backend: rocksdb...")
- val rock = new EmbeddedRocksDBStateBackend()
- val map = new JavaHashMap[String, Object]()
- val skipKey = List(KEY_FLINK_STATE_BACKEND_ASYNC,
KEY_FLINK_STATE_BACKEND_INCREMENTAL, KEY_FLINK_STATE_BACKEND_MEMORY,
KEY_FLINK_STATE_ROCKSDB)
-
parameter.getProperties.filter(_._1.startsWith(KEY_FLINK_STATE_ROCKSDB)).filterNot(x
=> skipKey.contains(x._1)).foreach(x => map.put(x._1, x._2))
- if (map.nonEmpty) {
- val optionsFactory = new DefaultConfigurableOptionsFactory
- val config = new Configuration()
- val confData = classOf[Configuration].getDeclaredField("confData")
- confData.setAccessible(true)
- confData.set(map, config)
- optionsFactory.configure(config)
- rock.setRocksDBOptions(optionsFactory)
- }
- streamEnvironment.setStateBackend(rock)
- storage match {
- case CheckpointStorage.filesystem =>
- logInfo("state.checkpoint-storage: filesystem...")
- cpConfig.setCheckpointStorage(new
FileSystemCheckpointStorage(cpDir))
- case _ =>
- throw new IllegalArgumentException("[StreamPark] state.backend
is rocksdb, state.checkpoint-storage must be filesystem...")
- }
- }
- }
- }
-
}