This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 327274d48 [flink] Remove Redundant Test Methods (#1921)
327274d48 is described below

commit 327274d488db07a621af37a5837170511971922e
Author: monster <60029759+monsterchenz...@users.noreply.github.com>
AuthorDate: Thu Aug 31 17:17:02 2023 +0800

    [flink] Remove Redundant Test Methods (#1921)
---
 .../org/apache/paimon/flink/action/Action.java     | 17 +++++++++++++
 .../apache/paimon/flink/action/CompactAction.java  |  1 +
 .../paimon/flink/action/CompactDatabaseAction.java |  1 +
 .../paimon/flink/action/SortCompactAction.java     |  1 +
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |  1 +
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  1 +
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  1 +
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  1 +
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  1 +
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  1 +
 .../flink/action/cdc/CdcActionITCaseBase.java      | 28 ++++++++++++++--------
 .../action/cdc/kafka/KafkaActionITCaseBase.java    | 15 ------------
 .../cdc/mongodb/MongoDBActionITCaseBase.java       | 13 ----------
 .../action/cdc/mysql/MySqlActionITCaseBase.java    | 14 -----------
 14 files changed, 44 insertions(+), 52 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 483c6c80a..9c4d24211 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -18,9 +18,26 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
 /** Abstract class for Flink actions. */
 public interface Action {
 
     /** The execution method of the action. */
     void run() throws Exception;
+
+    /**
+     * Builds the action within the given Flink Stream Execution Environment.
+     *
+     * <p>This method is responsible for setting up any necessary 
configurations or resources needed
+     * for the action to run. It is called before the `run` method to prepare 
the environment for
+     * execution.
+     *
+     * <p>By default, this method is empty and can be overridden by subclasses 
to provide custom
+     * setup logic.
+     *
+     * @param env The Flink Stream Execution Environment where the action will 
be executed.
+     * @throws Exception If an error occurs during the build process.
+     */
+    default void build(StreamExecutionEnvironment env) throws Exception {}
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 9d0b99484..68cfc2ed8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -70,6 +70,7 @@ public class CompactAction extends TableActionBase {
         return this;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) {
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         boolean isStreaming =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index 1be9e7b15..abbeb5e33 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -80,6 +80,7 @@ public class CompactDatabaseAction extends ActionBase {
         return shouldCompaction;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) {
         try {
             Pattern databasePattern = Pattern.compile(database);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 9b488476d..16557a508 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -72,6 +72,7 @@ public class SortCompactAction extends CompactAction {
         execute(env, "Sort Compact Job");
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) {
         // only support batch sort yet
         if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index c6a3d2be2..9065137b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -140,6 +140,7 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         return this;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         boolean caseSensitive = catalog.caseSensitive();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 2dffcfcab..ef3dee8da 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -140,6 +140,7 @@ public class KafkaSyncTableAction extends ActionBase {
         return this;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         KafkaSource<String> source = 
KafkaActionUtils.buildKafkaSource(kafkaConfig);
         String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 5f2f8db4d..75a687387 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -97,6 +97,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
         this.tableConfig = tableConfig;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         boolean caseSensitive = catalog.caseSensitive();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 57f12b37b..26940ac83 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -83,6 +83,7 @@ public class MongoDBSyncTableAction extends ActionBase {
         this.tableConfig = tableConfig;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         checkArgument(
                 mongodbConfig.contains(MongoDBSourceOptions.COLLECTION),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 5382e79b9..d18755dd3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -178,6 +178,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         return this;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         checkArgument(
                 !mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 9124a6b5d..aef0597ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -142,6 +142,7 @@ public class MySqlSyncTableAction extends ActionBase {
         return this;
     }
 
+    @Override
     public void build(StreamExecutionEnvironment env) throws Exception {
         checkArgument(
                 mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index cc584c43e..b4f6dcf48 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -139,16 +140,6 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         return config;
     }
 
-    protected void waitJobRunning(JobClient client) throws Exception {
-        while (true) {
-            JobStatus status = client.getJobStatus().get();
-            if (status == JobStatus.RUNNING) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
-    }
-
     protected List<String> mapToArgs(String argKey, Map<String, String> map) {
         List<String> args = new ArrayList<>();
         for (Map.Entry<String, String> entry : map.entrySet()) {
@@ -181,6 +172,23 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         return Arrays.asList(argKey, nullable.toString());
     }
 
+    public <T extends ActionBase> JobClient runActionWithDefaultEnv(T action) 
throws Exception {
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+        return client;
+    }
+
+    protected void waitJobRunning(JobClient client) throws Exception {
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
     /** Base builder to build table synchronization action from action 
arguments. */
     protected abstract static class SyncTableActionBuilder<T> {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 1dd3d2059..6dc85ce23 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -25,7 +25,6 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
-import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -226,20 +225,6 @@ public abstract class KafkaActionITCaseBase extends 
CdcActionITCaseBase {
         return config;
     }
 
-    protected JobClient runActionWithDefaultEnv(KafkaSyncTableAction action) 
throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-        return client;
-    }
-
-    protected JobClient runActionWithDefaultEnv(KafkaSyncDatabaseAction 
action) throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-        return client;
-    }
-
     protected KafkaSyncTableActionBuilder syncTableActionBuilder(Map<String, 
String> kafkaConfig) {
         return new KafkaSyncTableActionBuilder(kafkaConfig);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index c51accf06..8976d6aa4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -25,7 +25,6 @@ import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
-import org.apache.flink.core.execution.JobClient;
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,18 +76,6 @@ public abstract class MongoDBActionITCaseBase extends 
CdcActionITCaseBase {
         return 
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, dbName, 
content);
     }
 
-    protected void runActionWithDefaultEnv(MongoDBSyncTableAction action) 
throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-    }
-
-    protected void runActionWithDefaultEnv(MongoDBSyncDatabaseAction action) 
throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-    }
-
     protected MongoDBSyncTableActionBuilder syncTableActionBuilder(
             Map<String, String> mongodbConfig) {
         return new MongoDBSyncTableActionBuilder(mongodbConfig);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 5dbe58771..e93b0e03e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
-import org.apache.flink.core.execution.JobClient;
 import org.junit.jupiter.api.AfterAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,19 +91,6 @@ public class MySqlActionITCaseBase extends 
CdcActionITCaseBase {
         return config;
     }
 
-    protected JobClient runActionWithDefaultEnv(MySqlSyncTableAction action) 
throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-        return client;
-    }
-
-    protected void runActionWithDefaultEnv(MySqlSyncDatabaseAction action) 
throws Exception {
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-    }
-
     protected MySqlSyncTableActionBuilder syncTableActionBuilder(Map<String, 
String> mySqlConfig) {
         return new MySqlSyncTableActionBuilder(mySqlConfig);
     }

Reply via email to