This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4092545ba [test] unify get catalog method for flink action (#1626)
4092545ba is described below
commit 4092545bae77395567f738ab9db1274fcdb16f22
Author: JunZhang <[email protected]>
AuthorDate: Mon Jul 24 15:03:26 2023 +0800
[test] unify get catalog method for flink action (#1626)
---
.../java/org/apache/paimon/flink/action/ActionITCaseBase.java | 6 +++++-
.../paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java | 8 --------
.../action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java | 7 ++-----
.../action/cdc/kafka/KafkaCanalSyncTableActionITCase.java | 5 +----
.../flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java | 11 ++---------
.../flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 5 +----
6 files changed, 11 insertions(+), 31 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 74b8bc374..8f6f0eb42 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -85,7 +85,7 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
List<String> primaryKeys,
Map<String, String> options)
throws Exception {
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
Identifier identifier = Identifier.create(database, tableName);
catalog.createDatabase(database, true);
catalog.createTable(
@@ -115,4 +115,8 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
row -> result.add(DataFormatTestUtil.internalRowToString(row,
rowType)));
return result;
}
+
+ protected Catalog catalog() {
+ return CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 2582cd5c0..84a901a68 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -18,12 +18,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
@@ -369,10 +365,6 @@ public abstract class KafkaActionITCaseBase extends
ActionITCaseBase {
}
}
- protected Catalog catalog() {
- return CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
- }
-
protected FileStoreTable getFileStoreTable(String tableName) throws
Exception {
Identifier identifier = Identifier.create(database, tableName);
return (FileStoreTable) catalog().getTable(identifier);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 8a06b2df8..6dd34f280 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
@@ -305,7 +302,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
@Timeout(60)
public void testTableAffixMultiTopic() throws Exception {
// create table t1
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
catalog.createDatabase(database, true);
Identifier identifier = Identifier.create(database,
"test_prefix_t1_test_suffix");
Schema schema =
@@ -377,7 +374,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
@Timeout(60)
public void testTableAffixOneTopic() throws Exception {
// create table t1
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
catalog.createDatabase(database, true);
Identifier identifier = Identifier.create(database,
"test_prefix_t1_test_suffix");
Schema schema =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index bfbd89fe6..02607cc63 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -741,7 +738,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
kafkaConfig.put("topic", topic);
// create an incompatible table
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
catalog.createDatabase(database, true);
Identifier identifier = Identifier.create(database, tableName);
Schema schema =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 0595d6fa0..2134cef68 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -378,7 +375,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testIgnoreIncompatibleTables() throws Exception {
// create an incompatible table
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
catalog.createDatabase(database, true);
Identifier identifier = Identifier.create(database, "incompatible");
Schema schema =
@@ -443,7 +440,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Timeout(60)
public void testTableAffix() throws Exception {
// create table t1
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
catalog.createDatabase(database, true);
Identifier identifier = Identifier.create(database,
"test_prefix_t1_test_suffix");
Schema schema =
@@ -1179,10 +1176,6 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, Arrays.asList("pk"));
}
- private Catalog catalog() {
- return CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
- }
-
private FileStoreTable getFileStoreTable(String tableName) throws
Exception {
Identifier identifier = Identifier.create(database, tableName);
return (FileStoreTable) catalog().getTable(identifier);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 94cda55eb..a6e679a2d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -1032,7 +1029,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
private FileStoreTable getFileStoreTable() throws Exception {
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
Identifier identifier = Identifier.create(database, tableName);
return (FileStoreTable) catalog.getTable(identifier);
}