This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca6fc049 [flink] Set default recover strategy for TableEnvironment 
and StreamExecutionEnvironment in tests (#3133)
3ca6fc049 is described below

commit 3ca6fc0494428e490e05d196df96f05247b35d57
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 10 14:45:58 2024 +0800

    [flink] Set default recover strategy for TableEnvironment and 
StreamExecutionEnvironment in tests (#3133)
---
 .../flink/action/cdc/CdcActionITCaseBase.java      |  11 +-
 .../paimon/flink/kafka/KafkaTableTestBase.java     |   4 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |  13 +-
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |  13 +-
 .../flink/lookup/AsyncLookupFunctionWrapper.java   |   5 +
 .../org/apache/paimon/flink/CatalogITCaseBase.java |   9 +-
 .../org/apache/paimon/flink/FileStoreITCase.java   |  20 +-
 .../paimon/flink/FileSystemCatalogITCase.java      |  20 +-
 .../org/apache/paimon/flink/FlinkTestBase.java     |  16 +-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |   2 +-
 .../apache/paimon/flink/MappingTableITCase.java    |   3 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      | 129 +++++++------
 .../paimon/flink/action/ActionITCaseBase.java      |  42 ++--
 .../paimon/flink/action/CompactActionITCase.java   |   7 +-
 .../flink/action/CompactDatabaseActionITCase.java  |  18 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     |   7 +-
 .../paimon/flink/sink/SinkSavepointITCase.java     |  55 ++----
 .../paimon/flink/source/CompactorSourceITCase.java |   9 +-
 .../MultiTablesCompactorSourceBuilderITCase.java   |  15 +-
 .../apache/paimon/flink/util/AbstractTestBase.java | 214 ++++++++++++++++++++-
 .../procedure/MigrateDatabaseProcedureITCase.java  |  18 +-
 .../hive/procedure/MigrateFileProcedureITCase.java |   8 +-
 .../procedure/MigrateTableProcedureITCase.java     |  18 +-
 23 files changed, 401 insertions(+), 255 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 7a6210302..63b42e627 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -36,7 +36,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.AfterEach;
@@ -66,10 +65,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
 
     @BeforeEach
     public void setEnv() {
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
+        env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(2)
+                        .checkpointIntervalMs(1000)
+                        .build();
     }
 
     @AfterEach
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
index fdcb99f33..e0724d13a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.kafka;
 
 import org.apache.paimon.flink.util.AbstractTestBase;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -103,9 +102,8 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
 
     @BeforeEach
     public void setup() {
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env = streamExecutionEnvironmentBuilder().streamingMode().build();
         tEnv = StreamTableEnvironment.create(env);
-        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
         tEnv.getConfig()
                 .getConfiguration()
                 .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index d3369cef3..a7c6b2cb6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -42,7 +42,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.TraceableFileIO;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
@@ -148,12 +147,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
         }
 
         List<TestCdcEvent> events = mergeTestTableEvents(testTables);
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getCheckpointConfig().setCheckpointInterval(100);
-        if (!enableFailure) {
-            env.setRestartStrategy(RestartStrategies.noRestart());
-        }
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .allowRestart(enableFailure)
+                        .build();
 
         TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(events);
         DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 57a7604c0..081bd7d07 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -43,7 +43,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.TraceableFileIO;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Disabled;
@@ -145,12 +144,12 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
                         Collections.singletonList("pt"),
                         primaryKeys,
                         numBucket);
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getCheckpointConfig().setCheckpointInterval(100);
-        if (!enableFailure) {
-            env.setRestartStrategy(RestartStrategies.noRestart());
-        }
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .allowRestart(enableFailure)
+                        .build();
 
         TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(testTable.events());
         DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
index f8b41d141..99f3d4643 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
@@ -50,12 +50,17 @@ public class AsyncLookupFunctionWrapper extends 
AsyncLookupFunction {
     }
 
     private Collection<RowData> lookup(RowData keyRow) {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread()
+                
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
         try {
             synchronized (function) {
                 return function.lookup(keyRow);
             }
         } catch (IOException e) {
             throw new UncheckedIOException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(cl);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index 31b3ffdff..b5ff3b304 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -29,7 +29,6 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -51,7 +50,6 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -60,8 +58,6 @@ import java.util.Map;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-
 /** ITCase for catalog. */
 public abstract class CatalogITCaseBase extends AbstractTestBase {
 
@@ -71,7 +67,7 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
 
     @BeforeEach
     public void before() throws IOException {
-        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv = tableEnvironmentBuilder().batchMode().build();
         String catalog = "PAIMON";
         path = getTempDirPath();
         String inferScan =
@@ -89,8 +85,7 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
                                 .collect(Collectors.joining(","))));
         tEnv.useCatalog(catalog);
 
-        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
-        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        sEnv = 
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
         sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
         sEnv.useCatalog(catalog);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index b7c739e96..6b68532a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -35,7 +35,6 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.FailingFileIO;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -396,19 +395,16 @@ public class FileStoreITCase extends AbstractTestBase {
         return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), 
k));
     }
 
-    public static StreamExecutionEnvironment buildStreamEnv() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        env.enableCheckpointing(100);
-        env.setParallelism(2);
-        return env;
+    public StreamExecutionEnvironment buildStreamEnv() {
+        return streamExecutionEnvironmentBuilder()
+                .streamingMode()
+                .checkpointIntervalMs(100)
+                .parallelism(2)
+                .build();
     }
 
-    public static StreamExecutionEnvironment buildBatchEnv() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(2);
-        return env;
+    public StreamExecutionEnvironment buildBatchEnv() {
+        return 
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
     }
 
     public static FileStoreTable buildFileStoreTable(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 50d28cd11..451e4c78b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -28,10 +28,8 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.BlockingIterator;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
@@ -55,18 +53,16 @@ public class FileSystemCatalogITCase extends 
AbstractTestBase {
     private static final String DB_NAME = "default";
 
     private String path;
-    private StreamTableEnvironment tEnv;
+    private TableEnvironment tEnv;
 
     @BeforeEach
     public void setup() {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-        env.setParallelism(1);
-
-        tEnv = StreamTableEnvironment.create(env);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
+        tEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
+                        .build();
         path = getTempDirPath();
         tEnv.executeSql(
                 String.format("CREATE CATALOG fs WITH ('type'='paimon', 
'warehouse'='%s')", path));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
index c6cc77bfb..b7da6cd95 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
@@ -22,11 +22,8 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -59,7 +56,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     protected ExpectedResult expectedResult;
     protected boolean ignoreException;
 
-    protected StreamTableEnvironment tEnv;
+    protected TableEnvironment tEnv;
     protected String rootPath;
 
     protected ResolvedCatalogTable resolvedTable =
@@ -79,14 +76,11 @@ public abstract class FlinkTestBase extends 
AbstractTestBase {
         this.ignoreException = ignoreException;
         this.expectedResult = expectedResult;
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-        EnvironmentSettings.Builder builder = 
EnvironmentSettings.newInstance().inBatchMode();
         if (executionMode == RuntimeExecutionMode.STREAMING) {
-            env.enableCheckpointing(100);
-            builder.inStreamingMode();
+            tEnv = 
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
+        } else {
+            tEnv = tableEnvironmentBuilder().batchMode().build();
         }
-        tEnv = StreamTableEnvironment.create(env, builder.build());
         rootPath = getTempDirPath();
 
         tEnv.executeSql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index a11e7887c..26982e44a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -596,7 +596,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
 
         String query =
                 "SELECT /*+ LOOKUP('table'='D', 
'retry-predicate'='lookup_miss',"
-                        + " 'retry-strategy'='fixed_delay', 
'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+                        + " 'retry-strategy'='fixed_delay', 
'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */"
                         + " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ 
OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i 
= D.i";
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
index 0d3c5f052..fb011bb11 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.types.Row;
@@ -44,7 +43,7 @@ public class MappingTableITCase extends AbstractTestBase {
 
     @BeforeEach
     public void before() throws IOException {
-        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv = tableEnvironmentBuilder().batchMode().build();
         path = getTempDirPath();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 8872cc333..e495ad3da 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -26,11 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.utils.FailingFileIO;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -42,7 +39,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,7 +49,6 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for changelog table with primary keys. */
@@ -76,32 +71,6 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         }
     }
 
-    private TableEnvironment createBatchTableEnvironment() {
-        return 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
-    }
-
-    private TableEnvironment createStreamingTableEnvironment(int 
checkpointIntervalMs) {
-        TableEnvironment sEnv =
-                TableEnvironment.create(
-                        
EnvironmentSettings.newInstance().inStreamingMode().build());
-        // set checkpoint interval to a random number to emulate different 
speed of commit
-        sEnv.getConfig()
-                .getConfiguration()
-                .set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(checkpointIntervalMs));
-        sEnv.getConfig()
-                .set(
-                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
-                        ExecutionConfigOptions.UpsertMaterialize.NONE);
-        return sEnv;
-    }
-
-    private StreamExecutionEnvironment createStreamExecutionEnvironment(int 
checkpointIntervalMs) {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
-        return env;
-    }
-
     private String createCatalogSql(String catalogName, String warehouse) {
         String defaultPropertyString = "";
         if (tableDefaultProperties.size() > 0) {
@@ -138,10 +107,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testFullCompactionWithLongCheckpointInterval() throws 
Exception {
         // create table
-        TableEnvironment bEnv = createBatchTableEnvironment();
-        bEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        TableEnvironment bEnv = 
tableEnvironmentBuilder().batchMode().parallelism(1).build();
         bEnv.executeSql(createCatalogSql("testCatalog", path));
         bEnv.executeSql("USE CATALOG testCatalog");
         bEnv.executeSql(
@@ -156,18 +122,23 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + ")");
 
         // run select job
-        TableEnvironment sEnv = createStreamingTableEnvironment(100);
-        sEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .parallelism(1)
+                        .build();
         sEnv.executeSql(createCatalogSql("testCatalog", path));
         sEnv.executeSql("USE CATALOG testCatalog");
         CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
 
         // run compact job
-        StreamExecutionEnvironment env = 
createStreamExecutionEnvironment(2000);
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(2000)
+                        .build();
         env.setParallelism(1);
-        env.setRestartStrategy(RestartStrategies.noRestart());
         new CompactAction(path, "default", 
"T").withStreamExecutionEnvironment(env).build();
         JobClient client = env.executeAsync();
 
@@ -199,10 +170,11 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
     private void innerTestChangelogProducing(List<String> options) throws 
Exception {
         TableEnvironment sEnv =
-                
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
-        sEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+                        .parallelism(1)
+                        .build();
 
         sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
         sEnv.executeSql("USE CATALOG testCatalog");
@@ -273,7 +245,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Test
     @Timeout(1200)
     public void testNoChangelogProducerBatchRandom() throws Exception {
-        TableEnvironment bEnv = createBatchTableEnvironment();
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testNoChangelogProducerRandom(bEnv, 1, false);
     }
 
@@ -281,14 +253,19 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testNoChangelogProducerStreamingRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(random.nextInt(900) + 100)
+                        .allowRestart()
+                        .build();
         testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
     @Test
     @Timeout(1200)
     public void testFullCompactionChangelogProducerBatchRandom() throws 
Exception {
-        TableEnvironment bEnv = createBatchTableEnvironment();
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testFullCompactionChangelogProducerRandom(bEnv, 1, false);
     }
 
@@ -296,7 +273,12 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testFullCompactionChangelogProducerStreamingRandom() throws 
Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(random.nextInt(900) + 100)
+                        .allowRestart()
+                        .build();
         testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
@@ -304,14 +286,19 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testStandAloneFullCompactJobRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(random.nextInt(900) + 100)
+                        .allowRestart()
+                        .build();
         testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
     @Test
     @Timeout(1200)
     public void testLookupChangelogProducerBatchRandom() throws Exception {
-        TableEnvironment bEnv = createBatchTableEnvironment();
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testLookupChangelogProducerRandom(bEnv, 1, false);
     }
 
@@ -319,7 +306,12 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testLookupChangelogProducerStreamingRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(random.nextInt(900) + 100)
+                        .allowRestart()
+                        .build();
         testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
@@ -327,7 +319,12 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     @Timeout(1200)
     public void testStandAloneLookupJobRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(random.nextInt(900) + 100)
+                        .allowRestart()
+                        .build();
         testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
@@ -433,8 +430,12 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
         for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
             StreamExecutionEnvironment env =
-                    createStreamExecutionEnvironment(random.nextInt(1900) + 
100);
-            env.setParallelism(2);
+                    streamExecutionEnvironmentBuilder()
+                            .streamingMode()
+                            .checkpointIntervalMs(random.nextInt(1900) + 100)
+                            .parallelism(2)
+                            .allowRestart()
+                            .build();
             new CompactAction(path, "default", 
"T").withStreamExecutionEnvironment(env).build();
             env.executeAsync();
         }
@@ -469,7 +470,11 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
         for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
             StreamExecutionEnvironment env =
-                    createStreamExecutionEnvironment(random.nextInt(1900) + 
100);
+                    streamExecutionEnvironmentBuilder()
+                            .streamingMode()
+                            .checkpointIntervalMs(random.nextInt(1900) + 100)
+                            .allowRestart()
+                            .build();
             env.setParallelism(2);
             new CompactAction(path, "default", 
"T").withStreamExecutionEnvironment(env).build();
             env.executeAsync();
@@ -483,10 +488,12 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     private void checkChangelogTestResult(int numProducers) throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment(100);
-        sEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .parallelism(1)
+                        .build();
         sEnv.executeSql(createCatalogSql("testCatalog", path));
         sEnv.executeSql("USE CATALOG testCatalog");
 
@@ -601,7 +608,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     private void checkBatchResult(int numProducers) throws Exception {
-        TableEnvironment bEnv = createBatchTableEnvironment();
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         bEnv.executeSql(createCatalogSql("testCatalog", path));
         bEnv.executeSql("USE CATALOG testCatalog");
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 80cf2cc76..579237756 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -36,20 +36,12 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -148,20 +140,18 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         }
     }
 
-    protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-
-        if (isStreaming) {
-            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-            
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-            env.getCheckpointConfig().setCheckpointInterval(500);
-        } else {
-            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        }
+    @Override
+    protected TableEnvironmentBuilder tableEnvironmentBuilder() {
+        return super.tableEnvironmentBuilder()
+                .checkpointIntervalMs(500)
+                .parallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+    }
 
-        return env;
+    @Override
+    protected StreamExecutionEnvironmentBuilder 
streamExecutionEnvironmentBuilder() {
+        return super.streamExecutionEnvironmentBuilder()
+                .checkpointIntervalMs(500)
+                .parallelism(ThreadLocalRandom.current().nextInt(2) + 1);
     }
 
     protected <T extends ActionBase> T createAction(Class<T> clazz, 
List<String> args) {
@@ -195,17 +185,11 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
     }
 
     protected void callProcedure(String procedureStatement, boolean 
isStreaming, boolean dmlSync) {
-        StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
-
         TableEnvironment tEnv;
         if (isStreaming) {
-            tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
-            tEnv.getConfig()
-                    .set(
-                            
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
-                            Duration.ofMillis(500));
+            tEnv = 
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(500).build();
         } else {
-            tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+            tEnv = tableEnvironmentBuilder().batchMode().build();
         }
 
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index d0f10766e..06c29456c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -272,7 +272,12 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
     }
 
     private void runAction(boolean isStreaming) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+        StreamExecutionEnvironment env;
+        if (isStreaming) {
+            env = streamExecutionEnvironmentBuilder().streamingMode().build();
+        } else {
+            env = streamExecutionEnvironmentBuilder().batchMode().build();
+        }
 
         CompactAction action =
                 createAction(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index d7104f591..4fa1b3c53 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -133,7 +133,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            StreamExecutionEnvironment env =
+                    streamExecutionEnvironmentBuilder().batchMode().build();
             createAction(
                             CompactDatabaseAction.class,
                             "compact_database",
@@ -237,7 +238,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                                 "--table_conf",
                                 
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
             }
-            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            StreamExecutionEnvironment env =
+                    
streamExecutionEnvironmentBuilder().streamingMode().build();
             action.withStreamExecutionEnvironment(env).build();
             env.executeAsync();
         } else {
@@ -513,7 +515,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                 args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + 
"=1s");
             }
 
-            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            StreamExecutionEnvironment env =
+                    streamExecutionEnvironmentBuilder().batchMode().build();
             createAction(CompactDatabaseAction.class, args)
                     .withStreamExecutionEnvironment(env)
                     .build();
@@ -615,7 +618,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            StreamExecutionEnvironment env =
+                    
streamExecutionEnvironmentBuilder().streamingMode().build();
             createAction(CompactDatabaseAction.class, "compact_database", 
"--warehouse", warehouse)
                     .withStreamExecutionEnvironment(env)
                     .build();
@@ -688,7 +692,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            StreamExecutionEnvironment env =
+                    streamExecutionEnvironmentBuilder().batchMode().build();
             createAction(CompactDatabaseAction.class, "compact_database", 
"--warehouse", warehouse)
                     .withStreamExecutionEnvironment(env)
                     .build();
@@ -747,7 +752,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                         "--table_conf",
                         CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3");
 
-        StreamExecutionEnvironment env = buildDefaultEnv(true);
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         action.withStreamExecutionEnvironment(env).build();
         JobClient jobClient = env.executeAsync();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 68c3c42b0..f81c253cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -48,7 +48,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -117,8 +116,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
         write.close();
         commit.close();
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
         DataStreamSource<RowData> source =
@@ -152,7 +150,8 @@ public class CompactorSinkITCase extends AbstractTestBase {
     public void testCompactParallelism() throws Exception {
         FileStoreTable table = createFileStoreTable();
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
         DataStreamSource<RowData> source =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index 6d35351d8..fe56eae64 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -24,18 +24,12 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
@@ -43,7 +37,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -136,33 +129,19 @@ public class SinkSavepointITCase extends AbstractTestBase 
{
             SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
conf);
         }
 
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
-        
tEnv.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"filesystem");
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
path + "/checkpoint");
-        // input data must be strictly ordered for us to check changelog 
results
-        tEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(
-                        
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
-                        Integer.MAX_VALUE);
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(
-                        
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
-                        Duration.ofSeconds(1));
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(500)
+                        // input data must be strictly ordered for us to check 
changelog results
+                        .parallelism(1)
+                        .allowRestart()
+                        .setConf(conf)
+                        .setConf(StateBackendOptions.STATE_BACKEND, 
"filesystem")
+                        .setConf(
+                                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                                "file://" + path + "/checkpoint")
+                        .build();
 
         String createCatalogSql =
                 String.join(
@@ -219,9 +198,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
     }
 
     private void checkRecoverFromSavepointBatchResult() throws Exception {
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
-        TableEnvironment tEnv = TableEnvironment.create(settings);
-
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql(
                 String.join(
                         "\n",
@@ -248,9 +225,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
     }
 
     private void checkRecoverFromSavepointStreamingResult() throws Exception {
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
-        TableEnvironment tEnv = TableEnvironment.create(settings);
-
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().streamingMode().build();
         tEnv.executeSql(
                 String.join(
                         "\n",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 2a95b5d58..3164dd412 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -103,7 +103,8 @@ public class CompactorSourceITCase extends AbstractTestBase 
{
         write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
         commit.commit(1, write.prepareCommit(true, 1));
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         DataStreamSource<RowData> compactorSource =
                 new CompactorSourceBuilder("test", table)
                         .withContinuousMode(false)
@@ -164,7 +165,8 @@ public class CompactorSourceITCase extends AbstractTestBase 
{
         write.write(rowData(2, 1620, BinaryString.fromString("20221209"), 16));
         commit.commit(3, write.prepareCommit(true, 3));
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         DataStreamSource<RowData> compactorSource =
                 new CompactorSourceBuilder("test", table)
                         .withContinuousMode(true)
@@ -257,7 +259,8 @@ public class CompactorSourceITCase extends AbstractTestBase 
{
         write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
         commit.commit(2, write.prepareCommit(true, 2));
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         DataStreamSource<RowData> compactorSource =
                 new CompactorSourceBuilder("test", table)
                         .withContinuousMode(isStreaming)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index 56e9374d5..555585603 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -41,7 +41,6 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
@@ -156,9 +155,11 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .batchMode()
+                        .parallelism(ThreadLocalRandom.current().nextInt(2) + 
1)
+                        .build();
         DataStream<RowData> source =
                 new MultiTablesCompactorSourceBuilder(
                                 catalogLoader(),
@@ -254,7 +255,8 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         DataStream<RowData> compactorSource =
                 new MultiTablesCompactorSourceBuilder(
                                 catalogLoader(),
@@ -423,7 +425,8 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
         DataStream<RowData> compactorSource =
                 new MultiTablesCompactorSourceBuilder(
                                 catalogLoader(),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index 0850d6197..d3efddecc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -20,9 +20,19 @@ package org.apache.paimon.flink.util;
 
 import org.apache.paimon.utils.FileIOUtils;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
@@ -30,12 +40,13 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.UUID;
 
 /** Similar to Flink's AbstractTestBase but using Junit5. */
 public class AbstractTestBase {
 
-    private static final int DEFAULT_PARALLELISM = 8;
+    private static final int DEFAULT_PARALLELISM = 16;
 
     @RegisterExtension
     protected static final MiniClusterWithClientExtension 
MINI_CLUSTER_EXTENSION =
@@ -92,4 +103,205 @@ public class AbstractTestBase {
         return new File(
                 temporaryFolder.toFile(), String.format("%s/%s", 
UUID.randomUUID(), fileName));
     }
+
+    // 
----------------------------------------------------------------------------------------------------------------
+    //  Table Environment Utilities
+    // 
----------------------------------------------------------------------------------------------------------------
+
+    protected TableEnvironmentBuilder tableEnvironmentBuilder() {
+        return new TableEnvironmentBuilder();
+    }
+
+    /** Builder for {@link TableEnvironmentBuilder} in tests. */
+    protected static class TableEnvironmentBuilder {
+
+        private boolean streamingMode = true;
+        private Integer parallelism = null;
+        private Integer checkpointIntervalMs = null;
+        private boolean allowRestart = false;
+        private Configuration conf = new Configuration();
+
+        public TableEnvironmentBuilder batchMode() {
+            this.streamingMode = false;
+            return this;
+        }
+
+        public TableEnvironmentBuilder streamingMode() {
+            this.streamingMode = true;
+            return this;
+        }
+
+        public TableEnvironmentBuilder parallelism(int parallelism) {
+            this.parallelism = parallelism;
+            return this;
+        }
+
+        public TableEnvironmentBuilder checkpointIntervalMs(int 
checkpointIntervalMs) {
+            this.checkpointIntervalMs = checkpointIntervalMs;
+            return this;
+        }
+
+        public TableEnvironmentBuilder allowRestart() {
+            this.allowRestart = true;
+            return this;
+        }
+
+        public TableEnvironmentBuilder allowRestart(boolean allowRestart) {
+            this.allowRestart = allowRestart;
+            return this;
+        }
+
+        public <T> TableEnvironmentBuilder setConf(ConfigOption<T> option, T 
value) {
+            conf.set(option, value);
+            return this;
+        }
+
+        public TableEnvironmentBuilder setConf(Configuration conf) {
+            this.conf.addAll(conf);
+            return this;
+        }
+
+        public TableEnvironment build() {
+            TableEnvironment tEnv;
+            if (streamingMode) {
+                tEnv =
+                        TableEnvironment.create(
+                                
EnvironmentSettings.newInstance().inStreamingMode().build());
+                tEnv.getConfig()
+                        .set(
+                                
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+                                ExecutionConfigOptions.UpsertMaterialize.NONE);
+                if (checkpointIntervalMs != null) {
+                    tEnv.getConfig()
+                            .getConfiguration()
+                            .set(
+                                    
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+                                    Duration.ofMillis(checkpointIntervalMs));
+                }
+            } else {
+                tEnv =
+                        TableEnvironment.create(
+                                
EnvironmentSettings.newInstance().inBatchMode().build());
+            }
+
+            if (parallelism != null) {
+                tEnv.getConfig()
+                        .getConfiguration()
+                        .set(
+                                
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                                parallelism);
+            }
+
+            if (allowRestart) {
+                tEnv.getConfig()
+                        .getConfiguration()
+                        .set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+                tEnv.getConfig()
+                        .getConfiguration()
+                        .set(
+                                
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                                Integer.MAX_VALUE);
+                tEnv.getConfig()
+                        .getConfiguration()
+                        .set(
+                                
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                                Duration.ofSeconds(1));
+            } else {
+                tEnv.getConfig()
+                        .getConfiguration()
+                        .set(RestartStrategyOptions.RESTART_STRATEGY, 
"disable");
+            }
+
+            tEnv.getConfig().getConfiguration().addAll(conf);
+
+            return tEnv;
+        }
+    }
+
+    // 
----------------------------------------------------------------------------------------------------------------
+    //  Stream Execution Environment Utilities
+    // 
----------------------------------------------------------------------------------------------------------------
+
+    protected StreamExecutionEnvironmentBuilder 
streamExecutionEnvironmentBuilder() {
+        return new StreamExecutionEnvironmentBuilder();
+    }
+
+    /** Builder for {@link StreamExecutionEnvironment} in tests. */
+    protected static class StreamExecutionEnvironmentBuilder {
+
+        private boolean streamingMode = true;
+        private Integer parallelism = null;
+        private Integer checkpointIntervalMs = null;
+        private boolean allowRestart = false;
+        private Configuration conf = new Configuration();
+
+        public StreamExecutionEnvironmentBuilder batchMode() {
+            this.streamingMode = false;
+            return this;
+        }
+
+        public StreamExecutionEnvironmentBuilder streamingMode() {
+            this.streamingMode = true;
+            return this;
+        }
+
+        public StreamExecutionEnvironmentBuilder parallelism(int parallelism) {
+            this.parallelism = parallelism;
+            return this;
+        }
+
+        public StreamExecutionEnvironmentBuilder checkpointIntervalMs(int 
checkpointIntervalMs) {
+            this.checkpointIntervalMs = checkpointIntervalMs;
+            return this;
+        }
+
+        public StreamExecutionEnvironmentBuilder allowRestart() {
+            this.allowRestart = true;
+            return this;
+        }
+
+        public StreamExecutionEnvironmentBuilder allowRestart(boolean 
allowRestart) {
+            this.allowRestart = allowRestart;
+            return this;
+        }
+
+        public <T> StreamExecutionEnvironmentBuilder setConf(ConfigOption<T> 
option, T value) {
+            conf.set(option, value);
+            return this;
+        }
+
+        public StreamExecutionEnvironment build() {
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            if (streamingMode) {
+                env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+                if (checkpointIntervalMs != null) {
+                    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+                    
env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
+                }
+            } else {
+                env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+            }
+
+            if (parallelism != null) {
+                env.setParallelism(parallelism);
+            }
+
+            Configuration conf = new Configuration();
+            if (allowRestart) {
+                conf.set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+                conf.set(
+                        
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                        Integer.MAX_VALUE);
+                conf.set(
+                        
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                        Duration.ofSeconds(1));
+            } else {
+                conf.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+            }
+            conf.addAll(this.conf);
+            env.configure(conf);
+
+            return env;
+        }
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
index 70345e4f8..ff02a352e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
@@ -25,11 +25,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.assertj.core.api.Assertions;
@@ -90,10 +87,7 @@ public class MigrateDatabaseProcedureITCase extends 
ActionITCaseBase {
     }
 
     public void testUpgradePartitionTable(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -149,10 +143,7 @@ public class MigrateDatabaseProcedureITCase extends 
ActionITCaseBase {
     }
 
     public void testUpgradeNonPartitionTable(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -208,10 +199,7 @@ public class MigrateDatabaseProcedureITCase extends 
ActionITCaseBase {
     @ParameterizedTest
     @ValueSource(strings = {"orc", "parquet", "avro"})
     public void testMigrateDatabaseAction(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index b41a187d7..b71a38c59 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -24,11 +24,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.assertj.core.api.Assertions;
@@ -72,10 +69,7 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
     }
 
     public void test(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index 9c727ba1e..634f10a60 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -25,11 +25,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.assertj.core.api.Assertions;
@@ -89,10 +86,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     }
 
     public void testUpgradePartitionTable(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -125,10 +119,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     }
 
     public void testUpgradeNonPartitionTable(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -161,10 +152,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     @ParameterizedTest
     @ValueSource(strings = {"orc", "parquet", "avro"})
     public void testMigrateAction(String format) throws Exception {
-        StreamExecutionEnvironment env = buildDefaultEnv(false);
-
-        TableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

Reply via email to