This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 93c63d17e2 [Fix][Flink-starter]Hotfix flink checkpoint default value 
(#7040)
93c63d17e2 is described below

commit 93c63d17e23438bdac3e95f1585bf745c0f1848f
Author: Jast <[email protected]>
AuthorDate: Tue Jul 9 15:57:36 2024 +0800

    [Fix][Flink-starter]Hotfix flink checkpoint default value (#7040)
---
 .../apache/seatunnel/api/env/EnvCommonOptions.java |   1 +
 .../flink/execution/FlinkRuntimeEnvironment.java   | 267 +---------------
 .../AbstractFlinkRuntimeEnvironment.java}          | 344 +++++++++------------
 .../flink/execution/FlinkRuntimeEnvironment.java   | 269 +---------------
 4 files changed, 152 insertions(+), 729 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index f1fa9101ff..5c80fb7292 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -77,6 +77,7 @@ public interface EnvCommonOptions {
                     .noDefaultValue()
                     .withDescription(
                             "The each parallelism bytes limit per second for 
read data from source.");
+
     Option<Long> CHECKPOINT_TIMEOUT =
             Options.key("checkpoint.timeout")
                     .longType()
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 964d8825e1..fcc25a6b9e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,57 +19,26 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
 import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.TernaryBoolean;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
 @Slf4j
-public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
-    private static final String RESULT_TABLE_NAME = "result_table_name";
-    private static volatile FlinkRuntimeEnvironment INSTANCE = null;
-    private Config config;
-
-    private StreamExecutionEnvironment environment;
+public class FlinkRuntimeEnvironment extends AbstractFlinkRuntimeEnvironment
+        implements RuntimeEnvironment {
 
-    private StreamTableEnvironment tableEnvironment;
-    private JobMode jobMode;
-
-    private String jobName = Constants.LOGO;
+    private static volatile FlinkRuntimeEnvironment INSTANCE = null;
 
     private FlinkRuntimeEnvironment(Config config) {
-        this.initialize(config);
+        super(config);
     }
 
     @Override
@@ -78,16 +47,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         return this;
     }
 
-    @Override
-    public Config getConfig() {
-        return config;
-    }
-
-    @Override
-    public CheckResult checkConfig() {
-        return EnvironmentUtil.checkRestartStrategy(config);
-    }
-
     @Override
     public FlinkRuntimeEnvironment prepare() {
         createStreamEnvironment();
@@ -98,76 +57,12 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         return this;
     }
 
-    public String getJobName() {
-        return jobName;
-    }
-
-    public boolean isStreaming() {
-        return JobMode.STREAMING.equals(jobMode);
-    }
-
     @Override
     public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
         return this;
     }
 
-    @Override
-    public JobMode getJobMode() {
-        return jobMode;
-    }
-
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-        pluginPaths.forEach(url -> log.info("register plugins : {}", url));
-        List<Configuration> configurations = new ArrayList<>();
-        try {
-            configurations.add(
-                    (Configuration)
-                            Objects.requireNonNull(
-                                            ReflectionUtils.getDeclaredMethod(
-                                                    
StreamExecutionEnvironment.class,
-                                                    "getConfiguration"))
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "can't find "
-                                                                    + "method: 
getConfiguration"))
-                                    .invoke(this.environment));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        configurations.forEach(
-                configuration -> {
-                    List<String> jars = 
configuration.get(PipelineOptions.JARS);
-                    if (jars == null) {
-                        jars = new ArrayList<>();
-                    }
-                    jars.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.JARS,
-                            
jars.stream().distinct().collect(Collectors.toList()));
-                    List<String> classpath = 
configuration.get(PipelineOptions.CLASSPATHS);
-                    if (classpath == null) {
-                        classpath = new ArrayList<>();
-                    }
-                    classpath.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.CLASSPATHS,
-                            
classpath.stream().distinct().collect(Collectors.toList()));
-                });
-    }
-
-    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return environment;
-    }
-
-    public StreamTableEnvironment getStreamTableEnvironment() {
-        return tableEnvironment;
-    }
-
     private void createStreamTableEnvironment() {
         EnvironmentSettings environmentSettings =
                 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
@@ -185,160 +80,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         EnvironmentUtil.initTableEnvironmentConfiguration(this.config, 
config.getConfiguration());
     }
 
-    private void createStreamEnvironment() {
-        Configuration configuration = new Configuration();
-        EnvironmentUtil.initConfiguration(config, configuration);
-        environment = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-        setTimeCharacteristic();
-
-        setCheckpoint();
-
-        EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
-
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
-            long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
-            environment.setBufferTimeout(timeout);
-        }
-
-        if (config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
-            int parallelism = 
config.getInt(EnvCommonOptions.PARALLELISM.key());
-            environment.setParallelism(parallelism);
-        } else if (config.hasPath(ConfigKeyName.PARALLELISM)) {
-            log.warn(
-                    "the parameter 'execution.parallelism' will be deprecated, 
please use common parameter 'parallelism' to set it");
-            int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
-            environment.setParallelism(parallelism);
-        }
-
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.MAX_PARALLELISM)) {
-            int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
-            environment.setMaxParallelism(max);
-        }
-
-        if (this.jobMode.equals(JobMode.BATCH)) {
-            environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        }
-    }
-
-    private void setTimeCharacteristic() {
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.TIME_CHARACTERISTIC)) {
-            String timeType = 
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
-            switch (timeType.toLowerCase()) {
-                case "event-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                    break;
-                case "ingestion-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-                    break;
-                case "processing-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-                    break;
-                default:
-                    log.warn(
-                            "set time-characteristic failed, unknown 
time-characteristic [{}],only support 
event-time,ingestion-time,processing-time",
-                            timeType);
-                    break;
-            }
-        }
-    }
-
-    private void setCheckpoint() {
-        if (jobMode == JobMode.BATCH) {
-            log.warn(
-                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
-        }
-        long interval = 0;
-        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
-        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
-            log.warn(
-                    "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
-            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
-        }
-
-        if (interval > 0) {
-            CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            environment.enableCheckpointing(interval);
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
-                String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
-                switch (mode.toLowerCase()) {
-                    case "exactly-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-                        break;
-                    case "at-least-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
-                        break;
-                    default:
-                        log.warn(
-                                "set checkpoint.mode failed, unknown 
checkpoint.mode [{}],only support exactly-once,at-least-once",
-                                mode);
-                        break;
-                }
-            }
-
-            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
-                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
-                checkpointConfig.setCheckpointTimeout(timeout);
-            } else if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
-                long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
-                checkpointConfig.setCheckpointTimeout(timeout);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_DATA_URI)) {
-                String uri = 
config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
-                StateBackend fsStateBackend = new FsStateBackend(uri);
-                if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.STATE_BACKEND)) {
-                    String stateBackend = 
config.getString(ConfigKeyName.STATE_BACKEND);
-                    if ("rocksdb".equalsIgnoreCase(stateBackend)) {
-                        StateBackend rocksDBStateBackend =
-                                new RocksDBStateBackend(fsStateBackend, 
TernaryBoolean.TRUE);
-                        environment.setStateBackend(rocksDBStateBackend);
-                    }
-                } else {
-                    environment.setStateBackend(fsStateBackend);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
-                int max = 
config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
-                checkpointConfig.setMaxConcurrentCheckpoints(max);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
-                boolean cleanup = 
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
-                if (cleanup) {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
-                } else {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
-                long minPause = 
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
-                checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
-                int failNum = 
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
-                checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
-            }
-        }
-    }
-
-    public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
-        StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
-        if (!TableUtil.tableExists(tableEnvironment, name)) {
-            tableEnvironment.createTemporaryView(
-                    name, tableEnvironment.fromChangelogStream(dataStream));
-        }
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
similarity index 59%
copy from 
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
copy to 
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index 964d8825e1..d805c286f8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -30,7 +30,6 @@ import 
org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
 import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -41,8 +40,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TernaryBoolean;
@@ -56,27 +53,20 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
-    private static final String RESULT_TABLE_NAME = "result_table_name";
-    private static volatile FlinkRuntimeEnvironment INSTANCE = null;
-    private Config config;
+public abstract class AbstractFlinkRuntimeEnvironment implements 
RuntimeEnvironment {
 
-    private StreamExecutionEnvironment environment;
+    protected static final String RESULT_TABLE_NAME = "result_table_name";
+    protected Config config;
+    protected StreamExecutionEnvironment environment;
+    protected StreamTableEnvironment tableEnvironment;
+    protected JobMode jobMode;
+    protected String jobName = Constants.LOGO;
 
-    private StreamTableEnvironment tableEnvironment;
-    private JobMode jobMode;
-
-    private String jobName = Constants.LOGO;
-
-    private FlinkRuntimeEnvironment(Config config) {
+    protected AbstractFlinkRuntimeEnvironment(Config config) {
         this.initialize(config);
     }
 
-    @Override
-    public FlinkRuntimeEnvironment setConfig(Config config) {
-        this.config = config;
-        return this;
-    }
+    public abstract AbstractFlinkRuntimeEnvironment setConfig(Config config);
 
     @Override
     public Config getConfig() {
@@ -88,109 +78,105 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         return EnvironmentUtil.checkRestartStrategy(config);
     }
 
-    @Override
-    public FlinkRuntimeEnvironment prepare() {
-        createStreamEnvironment();
-        createStreamTableEnvironment();
-        if (config.hasPath("job.name")) {
-            jobName = config.getString("job.name");
-        }
-        return this;
+    public StreamTableEnvironment getStreamTableEnvironment() {
+        return tableEnvironment;
     }
 
-    public String getJobName() {
-        return jobName;
+    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+        return environment;
     }
 
-    public boolean isStreaming() {
-        return JobMode.STREAMING.equals(jobMode);
-    }
+    protected void setCheckpoint() {
+        if (jobMode == JobMode.BATCH) {
+            log.warn(
+                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
+        }
+        long interval;
+        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+            log.warn(
+                    "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
+            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+        } else {
+            interval = 10000L;
+        }
 
-    @Override
-    public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
-        this.jobMode = jobMode;
-        return this;
-    }
+        CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+        environment.enableCheckpointing(interval);
 
-    @Override
-    public JobMode getJobMode() {
-        return jobMode;
-    }
+        if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+            long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+            checkpointConfig.setCheckpointTimeout(timeout);
+        } else if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+            long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
+            checkpointConfig.setCheckpointTimeout(timeout);
+        }
 
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-        pluginPaths.forEach(url -> log.info("register plugins : {}", url));
-        List<Configuration> configurations = new ArrayList<>();
-        try {
-            configurations.add(
-                    (Configuration)
-                            Objects.requireNonNull(
-                                            ReflectionUtils.getDeclaredMethod(
-                                                    
StreamExecutionEnvironment.class,
-                                                    "getConfiguration"))
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "can't find "
-                                                                    + "method: 
getConfiguration"))
-                                    .invoke(this.environment));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
+            String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
+            switch (mode.toLowerCase()) {
+                case "exactly-once":
+                    
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+                    break;
+                case "at-least-once":
+                    
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
+                    break;
+                default:
+                    log.warn(
+                            "set checkpoint.mode failed, unknown 
checkpoint.mode [{}],only support exactly-once,at-least-once",
+                            mode);
+                    break;
+            }
         }
-        configurations.forEach(
-                configuration -> {
-                    List<String> jars = 
configuration.get(PipelineOptions.JARS);
-                    if (jars == null) {
-                        jars = new ArrayList<>();
-                    }
-                    jars.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.JARS,
-                            
jars.stream().distinct().collect(Collectors.toList()));
-                    List<String> classpath = 
configuration.get(PipelineOptions.CLASSPATHS);
-                    if (classpath == null) {
-                        classpath = new ArrayList<>();
-                    }
-                    classpath.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.CLASSPATHS,
-                            
classpath.stream().distinct().collect(Collectors.toList()));
-                });
-    }
 
-    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return environment;
-    }
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_DATA_URI)) {
+            String uri = config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
+            StateBackend fsStateBackend = new FsStateBackend(uri);
+            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.STATE_BACKEND)) {
+                String stateBackend = 
config.getString(ConfigKeyName.STATE_BACKEND);
+                if ("rocksdb".equalsIgnoreCase(stateBackend)) {
+                    StateBackend rocksDBStateBackend =
+                            new RocksDBStateBackend(fsStateBackend, 
TernaryBoolean.TRUE);
+                    environment.setStateBackend(rocksDBStateBackend);
+                }
+            } else {
+                environment.setStateBackend(fsStateBackend);
+            }
+        }
 
-    public StreamTableEnvironment getStreamTableEnvironment() {
-        return tableEnvironment;
-    }
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
+            int max = config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
+            checkpointConfig.setMaxConcurrentCheckpoints(max);
+        }
 
-    private void createStreamTableEnvironment() {
-        EnvironmentSettings environmentSettings =
-                
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
-        tableEnvironment =
-                StreamTableEnvironment.create(getStreamExecutionEnvironment(), 
environmentSettings);
-        TableConfig config = tableEnvironment.getConfig();
-        if (EnvironmentUtil.hasPathAndWaring(this.config, 
ConfigKeyName.MAX_STATE_RETENTION_TIME)
-                && EnvironmentUtil.hasPathAndWaring(
-                        this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
-            long max = 
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
-            long min = 
this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
-            config.setIdleStateRetentionTime(Time.seconds(min), 
Time.seconds(max));
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
+            boolean cleanup = 
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
+            if (cleanup) {
+                checkpointConfig.enableExternalizedCheckpoints(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+            } else {
+                checkpointConfig.enableExternalizedCheckpoints(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+            }
+        }
+
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
+            long minPause = 
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
+            checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
+        }
+
+        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
+            int failNum = 
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
+            checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
         }
-        // init flink table env config
-        EnvironmentUtil.initTableEnvironmentConfiguration(this.config, 
config.getConfiguration());
     }
 
-    private void createStreamEnvironment() {
+    protected void createStreamEnvironment() {
         Configuration configuration = new Configuration();
         EnvironmentUtil.initConfiguration(config, configuration);
         environment = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         setTimeCharacteristic();
-
         setCheckpoint();
 
         EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
@@ -242,95 +228,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    private void setCheckpoint() {
-        if (jobMode == JobMode.BATCH) {
-            log.warn(
-                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
-        }
-        long interval = 0;
-        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
-        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
-            log.warn(
-                    "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
-            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
-        }
-
-        if (interval > 0) {
-            CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            environment.enableCheckpointing(interval);
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
-                String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
-                switch (mode.toLowerCase()) {
-                    case "exactly-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-                        break;
-                    case "at-least-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
-                        break;
-                    default:
-                        log.warn(
-                                "set checkpoint.mode failed, unknown 
checkpoint.mode [{}],only support exactly-once,at-least-once",
-                                mode);
-                        break;
-                }
-            }
-
-            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
-                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
-                checkpointConfig.setCheckpointTimeout(timeout);
-            } else if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
-                long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
-                checkpointConfig.setCheckpointTimeout(timeout);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_DATA_URI)) {
-                String uri = 
config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
-                StateBackend fsStateBackend = new FsStateBackend(uri);
-                if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.STATE_BACKEND)) {
-                    String stateBackend = 
config.getString(ConfigKeyName.STATE_BACKEND);
-                    if ("rocksdb".equalsIgnoreCase(stateBackend)) {
-                        StateBackend rocksDBStateBackend =
-                                new RocksDBStateBackend(fsStateBackend, 
TernaryBoolean.TRUE);
-                        environment.setStateBackend(rocksDBStateBackend);
-                    }
-                } else {
-                    environment.setStateBackend(fsStateBackend);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
-                int max = 
config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
-                checkpointConfig.setMaxConcurrentCheckpoints(max);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
-                boolean cleanup = 
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
-                if (cleanup) {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
-                } else {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
-                long minPause = 
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
-                checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
-                int failNum = 
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
-                checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
-            }
-        }
-    }
-
     public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
         StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
         if (!TableUtil.tableExists(tableEnvironment, name)) {
@@ -339,14 +236,59 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         }
     }
 
-    public static FlinkRuntimeEnvironment getInstance(Config config) {
-        if (INSTANCE == null) {
-            synchronized (FlinkRuntimeEnvironment.class) {
-                if (INSTANCE == null) {
-                    INSTANCE = new FlinkRuntimeEnvironment(config);
-                }
-            }
+    public boolean isStreaming() {
+        return JobMode.STREAMING.equals(jobMode);
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    @Override
+    public JobMode getJobMode() {
+        return jobMode;
+    }
+
+    @Override
+    public void registerPlugin(List<URL> pluginPaths) {
+        pluginPaths.forEach(url -> log.info("register plugins : {}", url));
+        List<Configuration> configurations = new ArrayList<>();
+        try {
+            configurations.add(
+                    (Configuration)
+                            Objects.requireNonNull(
+                                            ReflectionUtils.getDeclaredMethod(
+                                                    
StreamExecutionEnvironment.class,
+                                                    "getConfiguration"))
+                                    .orElseThrow(
+                                            () ->
+                                                    new RuntimeException(
+                                                            "can't find "
+                                                                    + "method: 
getConfiguration"))
+                                    .invoke(this.environment));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        return INSTANCE;
+        configurations.forEach(
+                configuration -> {
+                    List<String> jars = 
configuration.get(PipelineOptions.JARS);
+                    if (jars == null) {
+                        jars = new ArrayList<>();
+                    }
+                    jars.addAll(
+                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
+                    configuration.set(
+                            PipelineOptions.JARS,
+                            
jars.stream().distinct().collect(Collectors.toList()));
+                    List<String> classpath = 
configuration.get(PipelineOptions.CLASSPATHS);
+                    if (classpath == null) {
+                        classpath = new ArrayList<>();
+                    }
+                    classpath.addAll(
+                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
+                    configuration.set(
+                            PipelineOptions.CLASSPATHS,
+                            
classpath.stream().distinct().collect(Collectors.toList()));
+                });
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 09640d222e..c1de8ff4f7 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -19,58 +19,26 @@ package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
 import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.TernaryBoolean;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
 @Slf4j
-public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
-    private static final String RESULT_TABLE_NAME = "result_table_name";
-    private static volatile FlinkRuntimeEnvironment INSTANCE = null;
-    private Config config;
-
-    private StreamExecutionEnvironment environment;
-
-    private StreamTableEnvironment tableEnvironment;
-
-    private JobMode jobMode;
+public class FlinkRuntimeEnvironment extends AbstractFlinkRuntimeEnvironment
+        implements RuntimeEnvironment {
 
-    private String jobName = Constants.LOGO;
+    private static volatile FlinkRuntimeEnvironment INSTANCE = null;
 
     private FlinkRuntimeEnvironment(Config config) {
-        this.initialize(config);
+        super(config);
     }
 
     @Override
@@ -79,16 +47,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         return this;
     }
 
-    @Override
-    public Config getConfig() {
-        return config;
-    }
-
-    @Override
-    public CheckResult checkConfig() {
-        return EnvironmentUtil.checkRestartStrategy(config);
-    }
-
     @Override
     public FlinkRuntimeEnvironment prepare() {
         createStreamEnvironment();
@@ -99,76 +57,12 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         return this;
     }
 
-    public String getJobName() {
-        return jobName;
-    }
-
-    public boolean isStreaming() {
-        return JobMode.STREAMING.equals(jobMode);
-    }
-
     @Override
     public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
         return this;
     }
 
-    @Override
-    public JobMode getJobMode() {
-        return jobMode;
-    }
-
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-        pluginPaths.forEach(url -> log.info("register plugins : {}", url));
-        List<Configuration> configurations = new ArrayList<>();
-        try {
-            configurations.add(
-                    (Configuration)
-                            Objects.requireNonNull(
-                                            ReflectionUtils.getDeclaredMethod(
-                                                    
StreamExecutionEnvironment.class,
-                                                    "getConfiguration"))
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "can't find "
-                                                                    + "method: 
getConfiguration"))
-                                    .invoke(this.environment));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        configurations.forEach(
-                configuration -> {
-                    List<String> jars = 
configuration.get(PipelineOptions.JARS);
-                    if (jars == null) {
-                        jars = new ArrayList<>();
-                    }
-                    jars.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.JARS,
-                            
jars.stream().distinct().collect(Collectors.toList()));
-                    List<String> classpath = 
configuration.get(PipelineOptions.CLASSPATHS);
-                    if (classpath == null) {
-                        classpath = new ArrayList<>();
-                    }
-                    classpath.addAll(
-                            
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
-                    configuration.set(
-                            PipelineOptions.CLASSPATHS,
-                            
classpath.stream().distinct().collect(Collectors.toList()));
-                });
-    }
-
-    public StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return environment;
-    }
-
-    public StreamTableEnvironment getStreamTableEnvironment() {
-        return tableEnvironment;
-    }
-
     private void createStreamTableEnvironment() {
         EnvironmentSettings environmentSettings =
                 EnvironmentSettings.newInstance().inStreamingMode().build();
@@ -186,161 +80,6 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
         EnvironmentUtil.initTableEnvironmentConfiguration(this.config, 
config.getConfiguration());
     }
 
-    private void createStreamEnvironment() {
-        Configuration configuration = new Configuration();
-        EnvironmentUtil.initConfiguration(config, configuration);
-        environment = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-        setTimeCharacteristic();
-
-        setCheckpoint();
-
-        EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
-
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
-            long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
-            environment.setBufferTimeout(timeout);
-        }
-
-        if (config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
-            int parallelism = 
config.getInt(EnvCommonOptions.PARALLELISM.key());
-            environment.setParallelism(parallelism);
-        } else if (config.hasPath(ConfigKeyName.PARALLELISM)) {
-            log.warn(
-                    "the parameter 'execution.parallelism' will be deprecated, 
please use common parameter 'parallelism' to set it");
-            int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
-            environment.setParallelism(parallelism);
-        }
-
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.MAX_PARALLELISM)) {
-            int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
-            environment.setMaxParallelism(max);
-        }
-
-        if (this.jobMode.equals(JobMode.BATCH)) {
-            environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        }
-    }
-
-    private void setTimeCharacteristic() {
-        if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.TIME_CHARACTERISTIC)) {
-            String timeType = 
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
-            switch (timeType.toLowerCase()) {
-                case "event-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                    break;
-                case "ingestion-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-                    break;
-                case "processing-time":
-                    
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-                    break;
-                default:
-                    log.warn(
-                            "set time-characteristic failed, unknown 
time-characteristic [{}],only support 
event-time,ingestion-time,processing-time",
-                            timeType);
-                    break;
-            }
-        }
-    }
-
-    private void setCheckpoint() {
-        if (jobMode == JobMode.BATCH) {
-            log.warn(
-                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
-        }
-        long interval = 0;
-        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
-        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
-            log.warn(
-                    "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
-            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
-        }
-
-        if (interval > 0) {
-            CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            environment.enableCheckpointing(interval);
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
-                String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
-                switch (mode.toLowerCase()) {
-                    case "exactly-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-                        break;
-                    case "at-least-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
-                        break;
-                    default:
-                        log.warn(
-                                "set checkpoint.mode failed, unknown 
checkpoint.mode [{}],only support exactly-once,at-least-once",
-                                mode);
-                        break;
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
-                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
-                checkpointConfig.setCheckpointTimeout(timeout);
-            } else if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
-                long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
-                checkpointConfig.setCheckpointTimeout(timeout);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_DATA_URI)) {
-                String uri = 
config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
-                StateBackend fsStateBackend = new FsStateBackend(uri);
-                if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.STATE_BACKEND)) {
-                    String stateBackend = 
config.getString(ConfigKeyName.STATE_BACKEND);
-                    if ("rocksdb".equalsIgnoreCase(stateBackend)) {
-                        StateBackend rocksDBStateBackend =
-                                new RocksDBStateBackend(fsStateBackend, 
TernaryBoolean.TRUE);
-                        environment.setStateBackend(rocksDBStateBackend);
-                    }
-                } else {
-                    environment.setStateBackend(fsStateBackend);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
-                int max = 
config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
-                checkpointConfig.setMaxConcurrentCheckpoints(max);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
-                boolean cleanup = 
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
-                if (cleanup) {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
-                } else {
-                    checkpointConfig.enableExternalizedCheckpoints(
-                            
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-                }
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
-                long minPause = 
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
-                checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
-            }
-
-            if (EnvironmentUtil.hasPathAndWaring(
-                    config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
-                int failNum = 
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
-                checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
-            }
-        }
-    }
-
-    public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
-        StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
-        if (!TableUtil.tableExists(tableEnvironment, name)) {
-            tableEnvironment.createTemporaryView(
-                    name, tableEnvironment.fromChangelogStream(dataStream));
-        }
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {

Reply via email to