This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ca6fc049 [flink] Set default recover strategy for TableEnvironment
and StreamExecutionEnvironment in tests (#3133)
3ca6fc049 is described below
commit 3ca6fc0494428e490e05d196df96f05247b35d57
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 10 14:45:58 2024 +0800
[flink] Set default recover strategy for TableEnvironment and
StreamExecutionEnvironment in tests (#3133)
---
.../flink/action/cdc/CdcActionITCaseBase.java | 11 +-
.../paimon/flink/kafka/KafkaTableTestBase.java | 4 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 13 +-
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 13 +-
.../flink/lookup/AsyncLookupFunctionWrapper.java | 5 +
.../org/apache/paimon/flink/CatalogITCaseBase.java | 9 +-
.../org/apache/paimon/flink/FileStoreITCase.java | 20 +-
.../paimon/flink/FileSystemCatalogITCase.java | 20 +-
.../org/apache/paimon/flink/FlinkTestBase.java | 16 +-
.../org/apache/paimon/flink/LookupJoinITCase.java | 2 +-
.../apache/paimon/flink/MappingTableITCase.java | 3 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 129 +++++++------
.../paimon/flink/action/ActionITCaseBase.java | 42 ++--
.../paimon/flink/action/CompactActionITCase.java | 7 +-
.../flink/action/CompactDatabaseActionITCase.java | 18 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 7 +-
.../paimon/flink/sink/SinkSavepointITCase.java | 55 ++----
.../paimon/flink/source/CompactorSourceITCase.java | 9 +-
.../MultiTablesCompactorSourceBuilderITCase.java | 15 +-
.../apache/paimon/flink/util/AbstractTestBase.java | 214 ++++++++++++++++++++-
.../procedure/MigrateDatabaseProcedureITCase.java | 18 +-
.../hive/procedure/MigrateFileProcedureITCase.java | 8 +-
.../procedure/MigrateTableProcedureITCase.java | 18 +-
23 files changed, 401 insertions(+), 255 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 7a6210302..63b42e627 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -36,7 +36,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
@@ -66,10 +65,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
@BeforeEach
public void setEnv() {
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
+ env =
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(2)
+ .checkpointIntervalMs(1000)
+ .build();
}
@AfterEach
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
index fdcb99f33..e0724d13a 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.kafka;
import org.apache.paimon.flink.util.AbstractTestBase;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -103,9 +102,8 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
@BeforeEach
public void setup() {
- env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env = streamExecutionEnvironmentBuilder().streamingMode().build();
tEnv = StreamTableEnvironment.create(env);
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index d3369cef3..a7c6b2cb6 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -42,7 +42,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
@@ -148,12 +147,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
}
List<TestCdcEvent> events = mergeTestTableEvents(testTables);
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getCheckpointConfig().setCheckpointInterval(100);
- if (!enableFailure) {
- env.setRestartStrategy(RestartStrategies.noRestart());
- }
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(100)
+ .allowRestart(enableFailure)
+ .build();
TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(events);
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 57a7604c0..081bd7d07 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -43,7 +43,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Disabled;
@@ -145,12 +144,12 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
Collections.singletonList("pt"),
primaryKeys,
numBucket);
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getCheckpointConfig().setCheckpointInterval(100);
- if (!enableFailure) {
- env.setRestartStrategy(RestartStrategies.noRestart());
- }
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(100)
+ .allowRestart(enableFailure)
+ .build();
TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
index f8b41d141..99f3d4643 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
@@ -50,12 +50,17 @@ public class AsyncLookupFunctionWrapper extends
AsyncLookupFunction {
}
private Collection<RowData> lookup(RowData keyRow) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread()
+
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(cl);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index 31b3ffdff..b5ff3b304 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -29,7 +29,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -51,7 +50,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -60,8 +58,6 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-
/** ITCase for catalog. */
public abstract class CatalogITCaseBase extends AbstractTestBase {
@@ -71,7 +67,7 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
@BeforeEach
public void before() throws IOException {
- tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv = tableEnvironmentBuilder().batchMode().build();
String catalog = "PAIMON";
path = getTempDirPath();
String inferScan =
@@ -89,8 +85,7 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
.collect(Collectors.joining(","))));
tEnv.useCatalog(catalog);
- sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
- sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
+ sEnv =
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
sEnv.useCatalog(catalog);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index b7c739e96..6b68532a2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -35,7 +35,6 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;
-import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -396,19 +395,16 @@ public class FileStoreITCase extends AbstractTestBase {
return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p),
k));
}
- public static StreamExecutionEnvironment buildStreamEnv() {
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- env.enableCheckpointing(100);
- env.setParallelism(2);
- return env;
+ public StreamExecutionEnvironment buildStreamEnv() {
+ return streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(100)
+ .parallelism(2)
+ .build();
}
- public static StreamExecutionEnvironment buildBatchEnv() {
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(2);
- return env;
+ public StreamExecutionEnvironment buildBatchEnv() {
+ return
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
}
public static FileStoreTable buildFileStoreTable(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 50d28cd11..451e4c78b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -28,10 +28,8 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
@@ -55,18 +53,16 @@ public class FileSystemCatalogITCase extends
AbstractTestBase {
private static final String DB_NAME = "default";
private String path;
- private StreamTableEnvironment tEnv;
+ private TableEnvironment tEnv;
@BeforeEach
public void setup() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(1);
-
- tEnv = StreamTableEnvironment.create(env);
- tEnv.getConfig()
- .getConfiguration()
- .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
+ tEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
+ .build();
path = getTempDirPath();
tEnv.executeSql(
String.format("CREATE CATALOG fs WITH ('type'='paimon',
'warehouse'='%s')", path));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
index c6cc77bfb..b7da6cd95 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
@@ -22,11 +22,8 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -59,7 +56,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
protected ExpectedResult expectedResult;
protected boolean ignoreException;
- protected StreamTableEnvironment tEnv;
+ protected TableEnvironment tEnv;
protected String rootPath;
protected ResolvedCatalogTable resolvedTable =
@@ -79,14 +76,11 @@ public abstract class FlinkTestBase extends
AbstractTestBase {
this.ignoreException = ignoreException;
this.expectedResult = expectedResult;
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- EnvironmentSettings.Builder builder =
EnvironmentSettings.newInstance().inBatchMode();
if (executionMode == RuntimeExecutionMode.STREAMING) {
- env.enableCheckpointing(100);
- builder.inStreamingMode();
+ tEnv =
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
+ } else {
+ tEnv = tableEnvironmentBuilder().batchMode().build();
}
- tEnv = StreamTableEnvironment.create(env, builder.build());
rootPath = getTempDirPath();
tEnv.executeSql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index a11e7887c..26982e44a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -596,7 +596,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
String query =
"SELECT /*+ LOOKUP('table'='D',
'retry-predicate'='lookup_miss',"
- + " 'retry-strategy'='fixed_delay',
'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ + " 'retry-strategy'='fixed_delay',
'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */"
+ " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i
= D.i";
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
index 0d3c5f052..fb011bb11 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
@@ -44,7 +43,7 @@ public class MappingTableITCase extends AbstractTestBase {
@BeforeEach
public void before() throws IOException {
- tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv = tableEnvironmentBuilder().batchMode().build();
path = getTempDirPath();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 8872cc333..e495ad3da 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -26,11 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -42,7 +39,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -53,7 +49,6 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
-import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for changelog table with primary keys. */
@@ -76,32 +71,6 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
}
- private TableEnvironment createBatchTableEnvironment() {
- return
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- }
-
- private TableEnvironment createStreamingTableEnvironment(int
checkpointIntervalMs) {
- TableEnvironment sEnv =
- TableEnvironment.create(
-
EnvironmentSettings.newInstance().inStreamingMode().build());
- // set checkpoint interval to a random number to emulate different
speed of commit
- sEnv.getConfig()
- .getConfiguration()
- .set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(checkpointIntervalMs));
- sEnv.getConfig()
- .set(
-
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
- ExecutionConfigOptions.UpsertMaterialize.NONE);
- return sEnv;
- }
-
- private StreamExecutionEnvironment createStreamExecutionEnvironment(int
checkpointIntervalMs) {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
- return env;
- }
-
private String createCatalogSql(String catalogName, String warehouse) {
String defaultPropertyString = "";
if (tableDefaultProperties.size() > 0) {
@@ -138,10 +107,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testFullCompactionWithLongCheckpointInterval() throws
Exception {
// create table
- TableEnvironment bEnv = createBatchTableEnvironment();
- bEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ TableEnvironment bEnv =
tableEnvironmentBuilder().batchMode().parallelism(1).build();
bEnv.executeSql(createCatalogSql("testCatalog", path));
bEnv.executeSql("USE CATALOG testCatalog");
bEnv.executeSql(
@@ -156,18 +122,23 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ ")");
// run select job
- TableEnvironment sEnv = createStreamingTableEnvironment(100);
- sEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(100)
+ .parallelism(1)
+ .build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
// run compact job
- StreamExecutionEnvironment env =
createStreamExecutionEnvironment(2000);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(2000)
+ .build();
env.setParallelism(1);
- env.setRestartStrategy(RestartStrategies.noRestart());
new CompactAction(path, "default",
"T").withStreamExecutionEnvironment(env).build();
JobClient client = env.executeAsync();
@@ -199,10 +170,11 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
private void innerTestChangelogProducing(List<String> options) throws
Exception {
TableEnvironment sEnv =
-
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
- sEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ tableEnvironmentBuilder()
+ .streamingMode()
+
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+ .parallelism(1)
+ .build();
sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
@@ -273,7 +245,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Test
@Timeout(1200)
public void testNoChangelogProducerBatchRandom() throws Exception {
- TableEnvironment bEnv = createBatchTableEnvironment();
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testNoChangelogProducerRandom(bEnv, 1, false);
}
@@ -281,14 +253,19 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testNoChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(900) + 100)
+ .allowRestart()
+ .build();
testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
@Test
@Timeout(1200)
public void testFullCompactionChangelogProducerBatchRandom() throws
Exception {
- TableEnvironment bEnv = createBatchTableEnvironment();
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}
@@ -296,7 +273,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testFullCompactionChangelogProducerStreamingRandom() throws
Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(900) + 100)
+ .allowRestart()
+ .build();
testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
@@ -304,14 +286,19 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testStandAloneFullCompactJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(900) + 100)
+ .allowRestart()
+ .build();
testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
@Test
@Timeout(1200)
public void testLookupChangelogProducerBatchRandom() throws Exception {
- TableEnvironment bEnv = createBatchTableEnvironment();
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testLookupChangelogProducerRandom(bEnv, 1, false);
}
@@ -319,7 +306,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testLookupChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(900) + 100)
+ .allowRestart()
+ .build();
testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
@@ -327,7 +319,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
@Timeout(1200)
public void testStandAloneLookupJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(900) + 100)
+ .allowRestart()
+ .build();
testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
@@ -433,8 +430,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
StreamExecutionEnvironment env =
- createStreamExecutionEnvironment(random.nextInt(1900) +
100);
- env.setParallelism(2);
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(1900) + 100)
+ .parallelism(2)
+ .allowRestart()
+ .build();
new CompactAction(path, "default",
"T").withStreamExecutionEnvironment(env).build();
env.executeAsync();
}
@@ -469,7 +470,11 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
StreamExecutionEnvironment env =
- createStreamExecutionEnvironment(random.nextInt(1900) +
100);
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(random.nextInt(1900) + 100)
+ .allowRestart()
+ .build();
env.setParallelism(2);
new CompactAction(path, "default",
"T").withStreamExecutionEnvironment(env).build();
env.executeAsync();
@@ -483,10 +488,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
private void checkChangelogTestResult(int numProducers) throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment(100);
- sEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(100)
+ .parallelism(1)
+ .build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
@@ -601,7 +608,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
private void checkBatchResult(int numProducers) throws Exception {
- TableEnvironment bEnv = createBatchTableEnvironment();
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
bEnv.executeSql(createCatalogSql("testCatalog", path));
bEnv.executeSql("USE CATALOG testCatalog");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 80cf2cc76..579237756 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -36,20 +36,12 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -148,20 +140,18 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
}
}
- protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-
- if (isStreaming) {
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(500);
- } else {
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- }
+ @Override
+ protected TableEnvironmentBuilder tableEnvironmentBuilder() {
+ return super.tableEnvironmentBuilder()
+ .checkpointIntervalMs(500)
+ .parallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ }
- return env;
+ @Override
+ protected StreamExecutionEnvironmentBuilder
streamExecutionEnvironmentBuilder() {
+ return super.streamExecutionEnvironmentBuilder()
+ .checkpointIntervalMs(500)
+ .parallelism(ThreadLocalRandom.current().nextInt(2) + 1);
}
protected <T extends ActionBase> T createAction(Class<T> clazz,
List<String> args) {
@@ -195,17 +185,11 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
}
protected void callProcedure(String procedureStatement, boolean
isStreaming, boolean dmlSync) {
- StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
-
TableEnvironment tEnv;
if (isStreaming) {
- tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inStreamingMode());
- tEnv.getConfig()
- .set(
-
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
- Duration.ofMillis(500));
+ tEnv =
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(500).build();
} else {
- tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ tEnv = tableEnvironmentBuilder().batchMode().build();
}
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index d0f10766e..06c29456c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -272,7 +272,12 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
}
private void runAction(boolean isStreaming) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+ StreamExecutionEnvironment env;
+ if (isStreaming) {
+ env = streamExecutionEnvironmentBuilder().streamingMode().build();
+ } else {
+ env = streamExecutionEnvironmentBuilder().batchMode().build();
+ }
CompactAction action =
createAction(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index d7104f591..4fa1b3c53 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -133,7 +133,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
if (ThreadLocalRandom.current().nextBoolean()) {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().batchMode().build();
createAction(
CompactDatabaseAction.class,
"compact_database",
@@ -237,7 +238,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
"--table_conf",
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
}
- StreamExecutionEnvironment env = buildDefaultEnv(true);
+ StreamExecutionEnvironment env =
+
streamExecutionEnvironmentBuilder().streamingMode().build();
action.withStreamExecutionEnvironment(env).build();
env.executeAsync();
} else {
@@ -513,7 +515,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() +
"=1s");
}
- StreamExecutionEnvironment env = buildDefaultEnv(false);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().batchMode().build();
createAction(CompactDatabaseAction.class, args)
.withStreamExecutionEnvironment(env)
.build();
@@ -615,7 +618,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
if (ThreadLocalRandom.current().nextBoolean()) {
- StreamExecutionEnvironment env = buildDefaultEnv(true);
+ StreamExecutionEnvironment env =
+
streamExecutionEnvironmentBuilder().streamingMode().build();
createAction(CompactDatabaseAction.class, "compact_database",
"--warehouse", warehouse)
.withStreamExecutionEnvironment(env)
.build();
@@ -688,7 +692,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
if (ThreadLocalRandom.current().nextBoolean()) {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().batchMode().build();
createAction(CompactDatabaseAction.class, "compact_database",
"--warehouse", warehouse)
.withStreamExecutionEnvironment(env)
.build();
@@ -747,7 +752,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
"--table_conf",
CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3");
- StreamExecutionEnvironment env = buildDefaultEnv(true);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
action.withStreamExecutionEnvironment(env).build();
JobClient jobClient = env.executeAsync();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 68c3c42b0..f81c253cc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -48,7 +48,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -117,8 +116,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
write.close();
commit.close();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
DataStreamSource<RowData> source =
@@ -152,7 +150,8 @@ public class CompactorSinkITCase extends AbstractTestBase {
public void testCompactParallelism() throws Exception {
FileStoreTable table = createFileStoreTable();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
DataStreamSource<RowData> source =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index 6d35351d8..fe56eae64 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -24,18 +24,12 @@ import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
@@ -43,7 +37,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -136,33 +129,19 @@ public class SinkSavepointITCase extends AbstractTestBase
{
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
conf);
}
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
- tEnv.getConfig()
- .getConfiguration()
- .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
-
tEnv.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND,
"filesystem");
- tEnv.getConfig()
- .getConfiguration()
- .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
path + "/checkpoint");
- // input data must be strictly ordered for us to check changelog
results
- tEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- tEnv.getConfig()
- .getConfiguration()
- .set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
- tEnv.getConfig()
- .getConfiguration()
- .set(
-
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
- Integer.MAX_VALUE);
- tEnv.getConfig()
- .getConfiguration()
- .set(
-
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
- Duration.ofSeconds(1));
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(500)
+ // input data must be strictly ordered for us to check
changelog results
+ .parallelism(1)
+ .allowRestart()
+ .setConf(conf)
+ .setConf(StateBackendOptions.STATE_BACKEND,
"filesystem")
+ .setConf(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ "file://" + path + "/checkpoint")
+ .build();
String createCatalogSql =
String.join(
@@ -219,9 +198,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
}
private void checkRecoverFromSavepointBatchResult() throws Exception {
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
- TableEnvironment tEnv = TableEnvironment.create(settings);
-
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql(
String.join(
"\n",
@@ -248,9 +225,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
}
private void checkRecoverFromSavepointStreamingResult() throws Exception {
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
- TableEnvironment tEnv = TableEnvironment.create(settings);
-
+ TableEnvironment tEnv =
tableEnvironmentBuilder().streamingMode().build();
tEnv.executeSql(
String.join(
"\n",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 2a95b5d58..3164dd412 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -103,7 +103,8 @@ public class CompactorSourceITCase extends AbstractTestBase
{
write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
commit.commit(1, write.prepareCommit(true, 1));
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
DataStreamSource<RowData> compactorSource =
new CompactorSourceBuilder("test", table)
.withContinuousMode(false)
@@ -164,7 +165,8 @@ public class CompactorSourceITCase extends AbstractTestBase
{
write.write(rowData(2, 1620, BinaryString.fromString("20221209"), 16));
commit.commit(3, write.prepareCommit(true, 3));
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
DataStreamSource<RowData> compactorSource =
new CompactorSourceBuilder("test", table)
.withContinuousMode(true)
@@ -257,7 +259,8 @@ public class CompactorSourceITCase extends AbstractTestBase
{
write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15));
commit.commit(2, write.prepareCommit(true, 2));
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
DataStreamSource<RowData> compactorSource =
new CompactorSourceBuilder("test", table)
.withContinuousMode(isStreaming)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index 56e9374d5..555585603 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -41,7 +41,6 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
@@ -156,9 +155,11 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .batchMode()
+ .parallelism(ThreadLocalRandom.current().nextInt(2) +
1)
+ .build();
DataStream<RowData> source =
new MultiTablesCompactorSourceBuilder(
catalogLoader(),
@@ -254,7 +255,8 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
DataStream<RowData> compactorSource =
new MultiTablesCompactorSourceBuilder(
catalogLoader(),
@@ -423,7 +425,8 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
DataStream<RowData> compactorSource =
new MultiTablesCompactorSourceBuilder(
catalogLoader(),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index 0850d6197..d3efddecc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -20,9 +20,19 @@ package org.apache.paimon.flink.util;
import org.apache.paimon.utils.FileIOUtils;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
@@ -30,12 +40,13 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.UUID;
/** Similar to Flink's AbstractTestBase but using Junit5. */
public class AbstractTestBase {
- private static final int DEFAULT_PARALLELISM = 8;
+ private static final int DEFAULT_PARALLELISM = 16;
@RegisterExtension
protected static final MiniClusterWithClientExtension
MINI_CLUSTER_EXTENSION =
@@ -92,4 +103,205 @@ public class AbstractTestBase {
return new File(
temporaryFolder.toFile(), String.format("%s/%s",
UUID.randomUUID(), fileName));
}
+
+ //
----------------------------------------------------------------------------------------------------------------
+ // Table Environment Utilities
+ //
----------------------------------------------------------------------------------------------------------------
+
+ protected TableEnvironmentBuilder tableEnvironmentBuilder() {
+ return new TableEnvironmentBuilder();
+ }
+
+ /** Builder for {@link TableEnvironmentBuilder} in tests. */
+ protected static class TableEnvironmentBuilder {
+
+ private boolean streamingMode = true;
+ private Integer parallelism = null;
+ private Integer checkpointIntervalMs = null;
+ private boolean allowRestart = false;
+ private Configuration conf = new Configuration();
+
+ public TableEnvironmentBuilder batchMode() {
+ this.streamingMode = false;
+ return this;
+ }
+
+ public TableEnvironmentBuilder streamingMode() {
+ this.streamingMode = true;
+ return this;
+ }
+
+ public TableEnvironmentBuilder parallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public TableEnvironmentBuilder checkpointIntervalMs(int
checkpointIntervalMs) {
+ this.checkpointIntervalMs = checkpointIntervalMs;
+ return this;
+ }
+
+ public TableEnvironmentBuilder allowRestart() {
+ this.allowRestart = true;
+ return this;
+ }
+
+ public TableEnvironmentBuilder allowRestart(boolean allowRestart) {
+ this.allowRestart = allowRestart;
+ return this;
+ }
+
+ public <T> TableEnvironmentBuilder setConf(ConfigOption<T> option, T
value) {
+ conf.set(option, value);
+ return this;
+ }
+
+ public TableEnvironmentBuilder setConf(Configuration conf) {
+ this.conf.addAll(conf);
+ return this;
+ }
+
+ public TableEnvironment build() {
+ TableEnvironment tEnv;
+ if (streamingMode) {
+ tEnv =
+ TableEnvironment.create(
+
EnvironmentSettings.newInstance().inStreamingMode().build());
+ tEnv.getConfig()
+ .set(
+
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ ExecutionConfigOptions.UpsertMaterialize.NONE);
+ if (checkpointIntervalMs != null) {
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(
+
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+ Duration.ofMillis(checkpointIntervalMs));
+ }
+ } else {
+ tEnv =
+ TableEnvironment.create(
+
EnvironmentSettings.newInstance().inBatchMode().build());
+ }
+
+ if (parallelism != null) {
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(
+
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+ parallelism);
+ }
+
+ if (allowRestart) {
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(RestartStrategyOptions.RESTART_STRATEGY,
"fixed-delay");
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(
+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+ Integer.MAX_VALUE);
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(
+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+ Duration.ofSeconds(1));
+ } else {
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(RestartStrategyOptions.RESTART_STRATEGY,
"disable");
+ }
+
+ tEnv.getConfig().getConfiguration().addAll(conf);
+
+ return tEnv;
+ }
+ }
+
+ //
----------------------------------------------------------------------------------------------------------------
+ // Stream Execution Environment Utilities
+ //
----------------------------------------------------------------------------------------------------------------
+
+ protected StreamExecutionEnvironmentBuilder
streamExecutionEnvironmentBuilder() {
+ return new StreamExecutionEnvironmentBuilder();
+ }
+
+ /** Builder for {@link StreamExecutionEnvironment} in tests. */
+ protected static class StreamExecutionEnvironmentBuilder {
+
+ private boolean streamingMode = true;
+ private Integer parallelism = null;
+ private Integer checkpointIntervalMs = null;
+ private boolean allowRestart = false;
+ private Configuration conf = new Configuration();
+
+ public StreamExecutionEnvironmentBuilder batchMode() {
+ this.streamingMode = false;
+ return this;
+ }
+
+ public StreamExecutionEnvironmentBuilder streamingMode() {
+ this.streamingMode = true;
+ return this;
+ }
+
+ public StreamExecutionEnvironmentBuilder parallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public StreamExecutionEnvironmentBuilder checkpointIntervalMs(int
checkpointIntervalMs) {
+ this.checkpointIntervalMs = checkpointIntervalMs;
+ return this;
+ }
+
+ public StreamExecutionEnvironmentBuilder allowRestart() {
+ this.allowRestart = true;
+ return this;
+ }
+
+ public StreamExecutionEnvironmentBuilder allowRestart(boolean
allowRestart) {
+ this.allowRestart = allowRestart;
+ return this;
+ }
+
+ public <T> StreamExecutionEnvironmentBuilder setConf(ConfigOption<T>
option, T value) {
+ conf.set(option, value);
+ return this;
+ }
+
+ public StreamExecutionEnvironment build() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ if (streamingMode) {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ if (checkpointIntervalMs != null) {
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
+ }
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
+
+ if (parallelism != null) {
+ env.setParallelism(parallelism);
+ }
+
+ Configuration conf = new Configuration();
+ if (allowRestart) {
+ conf.set(RestartStrategyOptions.RESTART_STRATEGY,
"fixed-delay");
+ conf.set(
+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+ Integer.MAX_VALUE);
+ conf.set(
+
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+ Duration.ofSeconds(1));
+ } else {
+ conf.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ }
+ conf.addAll(this.conf);
+ env.configure(conf);
+
+ return env;
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
index 70345e4f8..ff02a352e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
@@ -25,11 +25,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
@@ -90,10 +87,7 @@ public class MigrateDatabaseProcedureITCase extends
ActionITCaseBase {
}
public void testUpgradePartitionTable(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -149,10 +143,7 @@ public class MigrateDatabaseProcedureITCase extends
ActionITCaseBase {
}
public void testUpgradeNonPartitionTable(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -208,10 +199,7 @@ public class MigrateDatabaseProcedureITCase extends
ActionITCaseBase {
@ParameterizedTest
@ValueSource(strings = {"orc", "parquet", "avro"})
public void testMigrateDatabaseAction(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index b41a187d7..b71a38c59 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -24,11 +24,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
@@ -72,10 +69,7 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
}
public void test(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index 9c727ba1e..634f10a60 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -25,11 +25,8 @@ import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
@@ -89,10 +86,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
}
public void testUpgradePartitionTable(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -125,10 +119,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
}
public void testUpgradeNonPartitionTable(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
@@ -161,10 +152,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
@ParameterizedTest
@ValueSource(strings = {"orc", "parquet", "avro"})
public void testMigrateAction(String format) throws Exception {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
-
- TableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);