This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new 327274d48 [flink] Remove Redundant Test Methods (#1921) 327274d48 is described below commit 327274d488db07a621af37a5837170511971922e Author: monster <60029759+monsterchenz...@users.noreply.github.com> AuthorDate: Thu Aug 31 17:17:02 2023 +0800 [flink] Remove Redundant Test Methods (#1921) --- .../org/apache/paimon/flink/action/Action.java | 17 +++++++++++++ .../apache/paimon/flink/action/CompactAction.java | 1 + .../paimon/flink/action/CompactDatabaseAction.java | 1 + .../paimon/flink/action/SortCompactAction.java | 1 + .../action/cdc/kafka/KafkaSyncDatabaseAction.java | 1 + .../action/cdc/kafka/KafkaSyncTableAction.java | 1 + .../cdc/mongodb/MongoDBSyncDatabaseAction.java | 1 + .../action/cdc/mongodb/MongoDBSyncTableAction.java | 1 + .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 1 + .../action/cdc/mysql/MySqlSyncTableAction.java | 1 + .../flink/action/cdc/CdcActionITCaseBase.java | 28 ++++++++++++++-------- .../action/cdc/kafka/KafkaActionITCaseBase.java | 15 ------------ .../cdc/mongodb/MongoDBActionITCaseBase.java | 13 ---------- .../action/cdc/mysql/MySqlActionITCaseBase.java | 14 ----------- 14 files changed, 44 insertions(+), 52 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java index 483c6c80a..9c4d24211 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java @@ -18,9 +18,26 @@ package org.apache.paimon.flink.action; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + /** Abstract class for Flink actions. */ public interface Action { /** The execution method of the action. */ void run() throws Exception; + + /** + * Builds the action within the given Flink Stream Execution Environment. + * + * <p>This method is responsible for setting up any necessary configurations or resources needed + * for the action to run. It is called before the `run` method to prepare the environment for + * execution. + * + * <p>By default, this method is empty and can be overridden by subclasses to provide custom + * setup logic. + * + * @param env The Flink Stream Execution Environment where the action will be executed. + * @throws Exception If an error occurs during the build process. + */ + default void build(StreamExecutionEnvironment env) throws Exception {} } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 9d0b99484..68cfc2ed8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -70,6 +70,7 @@ public class CompactAction extends TableActionBase { return this; } + @Override public void build(StreamExecutionEnvironment env) { ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); boolean isStreaming = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 1be9e7b15..abbeb5e33 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -80,6 +80,7 @@ public class CompactDatabaseAction extends ActionBase { return shouldCompaction; } + @Override public void build(StreamExecutionEnvironment env) { try { Pattern databasePattern = Pattern.compile(database); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 9b488476d..16557a508 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -72,6 +72,7 @@ public class SortCompactAction extends CompactAction { execute(env, "Sort Compact Job"); } + @Override public void build(StreamExecutionEnvironment env) { // only support batch sort yet if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index c6a3d2be2..9065137b1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -140,6 +140,7 @@ public class KafkaSyncDatabaseAction extends ActionBase { return this; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { boolean caseSensitive = catalog.caseSensitive(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 2dffcfcab..ef3dee8da 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -140,6 +140,7 @@ public class KafkaSyncTableAction extends ActionBase { return this; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig); String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index 5f2f8db4d..75a687387 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -97,6 +97,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase { this.tableConfig = tableConfig; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { boolean caseSensitive = catalog.caseSensitive(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index 57f12b37b..26940ac83 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -83,6 +83,7 @@ public class MongoDBSyncTableAction extends ActionBase { this.tableConfig = tableConfig; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { checkArgument( mongodbConfig.contains(MongoDBSourceOptions.COLLECTION), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 5382e79b9..d18755dd3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -178,6 +178,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { return this; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { checkArgument( !mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index 9124a6b5d..aef0597ec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -142,6 +142,7 @@ public class MySqlSyncTableAction extends ActionBase { return this; } + @Override public void build(StreamExecutionEnvironment env) throws Exception { checkArgument( mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index cc584c43e..b4f6dcf48 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.action.cdc; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.ReadBuilder; @@ -139,16 +140,6 @@ public class CdcActionITCaseBase extends ActionITCaseBase { return config; } - protected void waitJobRunning(JobClient client) throws Exception { - while (true) { - JobStatus status = client.getJobStatus().get(); - if (status == JobStatus.RUNNING) { - break; - } - Thread.sleep(1000); - } - } - protected List<String> mapToArgs(String argKey, Map<String, String> map) { List<String> args = new ArrayList<>(); for (Map.Entry<String, String> entry : map.entrySet()) { @@ -181,6 +172,23 @@ public class CdcActionITCaseBase extends ActionITCaseBase { return Arrays.asList(argKey, nullable.toString()); } + public <T extends ActionBase> JobClient runActionWithDefaultEnv(T action) throws Exception { + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + return client; + } + + protected void waitJobRunning(JobClient client) throws Exception { + while (true) { + JobStatus status = client.getJobStatus().get(); + if (status == JobStatus.RUNNING) { + break; + } + Thread.sleep(1000); + } + } + /** Base builder to build table synchronization action from action arguments. */ protected abstract static class SyncTableActionBuilder<T> { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 1dd3d2059..6dc85ce23 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -25,7 +25,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.java.utils.MultipleParameterTool; -import org.apache.flink.core.execution.JobClient; import org.apache.flink.util.DockerImageVersions; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; @@ -226,20 +225,6 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase { return config; } - protected JobClient runActionWithDefaultEnv(KafkaSyncTableAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - return client; - } - - protected JobClient runActionWithDefaultEnv(KafkaSyncDatabaseAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - return client; - } - protected KafkaSyncTableActionBuilder syncTableActionBuilder(Map<String, String> kafkaConfig) { return new KafkaSyncTableActionBuilder(kafkaConfig); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java index c51accf06..8976d6aa4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java @@ -25,7 +25,6 @@ import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import org.apache.flink.api.java.utils.MultipleParameterTool; -import org.apache.flink.core.execution.JobClient; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,18 +76,6 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase { return MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, dbName, content); } - protected void runActionWithDefaultEnv(MongoDBSyncTableAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - } - - protected void runActionWithDefaultEnv(MongoDBSyncDatabaseAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - } - protected MongoDBSyncTableActionBuilder syncTableActionBuilder( Map<String, String> mongodbConfig) { return new MongoDBSyncTableActionBuilder(mongodbConfig); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index 5dbe58771..e93b0e03e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -21,7 +21,6 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import org.apache.flink.api.java.utils.MultipleParameterTool; -import org.apache.flink.core.execution.JobClient; import org.junit.jupiter.api.AfterAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,19 +91,6 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase { return config; } - protected JobClient runActionWithDefaultEnv(MySqlSyncTableAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - return client; - } - - protected void runActionWithDefaultEnv(MySqlSyncDatabaseAction action) throws Exception { - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - } - protected MySqlSyncTableActionBuilder syncTableActionBuilder(Map<String, String> mySqlConfig) { return new MySqlSyncTableActionBuilder(mySqlConfig); }