This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4092545ba [test] unify get catalog method for flink action (#1626)
4092545ba is described below

commit 4092545bae77395567f738ab9db1274fcdb16f22
Author: JunZhang <[email protected]>
AuthorDate: Mon Jul 24 15:03:26 2023 +0800

    [test] unify get catalog method for flink action (#1626)
---
 .../java/org/apache/paimon/flink/action/ActionITCaseBase.java |  6 +++++-
 .../paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java  |  8 --------
 .../action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java  |  7 ++-----
 .../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java     |  5 +----
 .../flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java | 11 ++---------
 .../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java    |  5 +----
 6 files changed, 11 insertions(+), 31 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 74b8bc374..8f6f0eb42 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -85,7 +85,7 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
             List<String> primaryKeys,
             Map<String, String> options)
             throws Exception {
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         Identifier identifier = Identifier.create(database, tableName);
         catalog.createDatabase(database, true);
         catalog.createTable(
@@ -115,4 +115,8 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
                 row -> result.add(DataFormatTestUtil.internalRowToString(row, 
rowType)));
         return result;
     }
+
+    protected Catalog catalog() {
+        return CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 2582cd5c0..84a901a68 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -18,12 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
@@ -369,10 +365,6 @@ public abstract class KafkaActionITCaseBase extends 
ActionITCaseBase {
         }
     }
 
-    protected Catalog catalog() {
-        return CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
-    }
-
     protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Identifier identifier = Identifier.create(database, tableName);
         return (FileStoreTable) catalog().getTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 8a06b2df8..6dd34f280 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
@@ -305,7 +302,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
     @Timeout(60)
     public void testTableAffixMultiTopic() throws Exception {
         // create table t1
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         catalog.createDatabase(database, true);
         Identifier identifier = Identifier.create(database, 
"test_prefix_t1_test_suffix");
         Schema schema =
@@ -377,7 +374,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
     @Timeout(60)
     public void testTableAffixOneTopic() throws Exception {
         // create table t1
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         catalog.createDatabase(database, true);
         Identifier identifier = Identifier.create(database, 
"test_prefix_t1_test_suffix");
         Schema schema =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index bfbd89fe6..02607cc63 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -741,7 +738,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
         kafkaConfig.put("topic", topic);
 
         // create an incompatible table
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         catalog.createDatabase(database, true);
         Identifier identifier = Identifier.create(database, tableName);
         Schema schema =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 0595d6fa0..2134cef68 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -378,7 +375,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     @Timeout(60)
     public void testIgnoreIncompatibleTables() throws Exception {
         // create an incompatible table
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         catalog.createDatabase(database, true);
         Identifier identifier = Identifier.create(database, "incompatible");
         Schema schema =
@@ -443,7 +440,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     @Timeout(60)
     public void testTableAffix() throws Exception {
         // create table t1
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         catalog.createDatabase(database, true);
         Identifier identifier = Identifier.create(database, 
"test_prefix_t1_test_suffix");
         Schema schema =
@@ -1179,10 +1176,6 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, Arrays.asList("pk"));
     }
 
-    private Catalog catalog() {
-        return CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
-    }
-
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Identifier identifier = Identifier.create(database, tableName);
         return (FileStoreTable) catalog().getTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 94cda55eb..a6e679a2d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -1032,7 +1029,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
     }
 
     private FileStoreTable getFileStoreTable() throws Exception {
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         Identifier identifier = Identifier.create(database, tableName);
         return (FileStoreTable) catalog.getTable(identifier);
     }

Reply via email to