zhangshenghang commented on code in PR #7040:
URL: https://github.com/apache/seatunnel/pull/7040#discussion_r1669840841
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java:
##########
@@ -186,158 +82,15 @@ private void createStreamTableEnvironment() {
EnvironmentUtil.initTableEnvironmentConfiguration(this.config,
config.getConfiguration());
}
- private void createStreamEnvironment() {
- Configuration configuration = new Configuration();
- EnvironmentUtil.initConfiguration(config, configuration);
- environment =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- setTimeCharacteristic();
-
- setCheckpoint();
-
- EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
- long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
- environment.setBufferTimeout(timeout);
- }
-
- if (config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
- int parallelism =
config.getInt(EnvCommonOptions.PARALLELISM.key());
- environment.setParallelism(parallelism);
- } else if (config.hasPath(ConfigKeyName.PARALLELISM)) {
- log.warn(
- "the parameter 'execution.parallelism' will be deprecated,
please use common parameter 'parallelism' to set it");
- int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
- environment.setParallelism(parallelism);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.MAX_PARALLELISM)) {
- int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
- environment.setMaxParallelism(max);
- }
-
- if (this.jobMode.equals(JobMode.BATCH)) {
- environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
- }
- }
-
- private void setTimeCharacteristic() {
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.TIME_CHARACTERISTIC)) {
- String timeType =
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
- switch (timeType.toLowerCase()) {
- case "event-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- break;
- case "ingestion-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- break;
- case "processing-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- break;
- default:
- log.warn(
- "set time-characteristic failed, unknown
time-characteristic [{}],only support
event-time,ingestion-time,processing-time",
- timeType);
- break;
- }
- }
- }
-
- private void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
- }
- long interval = 0;
- if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
- } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
- log.warn(
- "the parameter 'execution.checkpoint.interval' will be
deprecated, please use common parameter 'checkpoint.interval' to set it");
- interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
- }
-
- if (interval > 0) {
- CheckpointConfig checkpointConfig =
environment.getCheckpointConfig();
- environment.enableCheckpointing(interval);
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_MODE)) {
- String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
- switch (mode.toLowerCase()) {
- case "exactly-once":
-
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- break;
- case "at-least-once":
-
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
- break;
- default:
- log.warn(
- "set checkpoint.mode failed, unknown
checkpoint.mode [{}],only support exactly-once,at-least-once",
- mode);
- break;
- }
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
- long timeout =
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
- checkpointConfig.setCheckpointTimeout(timeout);
- } else if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
- long timeout =
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
- checkpointConfig.setCheckpointTimeout(timeout);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_DATA_URI)) {
- String uri =
config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
- StateBackend fsStateBackend = new FsStateBackend(uri);
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.STATE_BACKEND)) {
- String stateBackend =
config.getString(ConfigKeyName.STATE_BACKEND);
- if ("rocksdb".equalsIgnoreCase(stateBackend)) {
- StateBackend rocksDBStateBackend =
- new RocksDBStateBackend(fsStateBackend,
TernaryBoolean.TRUE);
- environment.setStateBackend(rocksDBStateBackend);
- }
- } else {
- environment.setStateBackend(fsStateBackend);
- }
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
- int max =
config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
- checkpointConfig.setMaxConcurrentCheckpoints(max);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
- boolean cleanup =
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
- if (cleanup) {
- checkpointConfig.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
- } else {
- checkpointConfig.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- }
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
- long minPause =
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
- checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
- int failNum =
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
- checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
- }
- }
- }
-
- public void registerResultTable(Config config, DataStream<Row> dataStream,
String name) {
- StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
- tableEnvironment.createTemporaryView(
- name, tableEnvironment.fromChangelogStream(dataStream));
+ protected void setCheckpoint() {
+ super.setCheckpoint();
+ CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+ if (EnvironmentUtil.hasPathAndWaring(config,
EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
Review Comment:
> Yes. Please use `if
(config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {`.
@Hisoka-X modify completed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]