This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new e626faaa59 [#6216] flink-connector: Add integration Tests for
GravitinoPaimonCatalog with Catalog-backend as HiveMetaStore (#6805)
e626faaa59 is described below
commit e626faaa59527ab9ee087f4c38c500dba7fd25e1
Author: yangyang zhong <[email protected]>
AuthorDate: Mon Apr 7 10:12:21 2025 +0800
[#6216] flink-connector: Add integration Tests for GravitinoPaimonCatalog
with Catalog-backend as HiveMetaStore (#6805)
### Why are the changes needed?
Fix: #6216
### How was this patch tested?
FlinkPaimonHiveCatalogIT
---
.../test/paimon/FlinkPaimonCatalogIT.java | 76 +++++++++-------------
.../test/paimon/FlinkPaimonHiveBackendIT.java | 59 +++++++++++++++++
.../FlinkPaimonLocalFileSystemBackendIT.java | 63 ++++++++++++++++++
3 files changed, 152 insertions(+), 46 deletions(-)
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 948e02dd81..caa480c756 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -19,28 +19,17 @@
package org.apache.gravitino.flink.connector.integration.test.paimon;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import java.nio.file.Path;
import java.util.Map;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-@Tag("gravitino-docker-test")
-public class FlinkPaimonCatalogIT extends FlinkCommonIT {
+public abstract class FlinkPaimonCatalogIT extends FlinkCommonIT {
- @TempDir private static Path warehouseDir;
-
- private static final String DEFAULT_PAIMON_CATALOG =
- "test_flink_paimon_filesystem_schema_catalog";
-
- private static org.apache.gravitino.Catalog catalog;
+ protected org.apache.gravitino.Catalog catalog;
@Override
protected boolean supportSchemaOperationWithCommentAndOptions() {
@@ -61,56 +50,51 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
return catalog;
}
- @BeforeAll
- void setup() {
- initPaimonCatalog();
- }
-
- @AfterAll
- void stop() {
- Preconditions.checkNotNull(metalake);
- metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true);
- }
-
private void initPaimonCatalog() {
Preconditions.checkNotNull(metalake);
catalog =
metalake.createCatalog(
- DEFAULT_PAIMON_CATALOG,
+ getPaimonCatalogName(),
org.apache.gravitino.Catalog.Type.RELATIONAL,
getProvider(),
null,
- ImmutableMap.of(
- PaimonConstants.CATALOG_BACKEND,
- "filesystem",
- "warehouse",
- warehouseDir.toString()));
+ getPaimonCatalogOptions());
}
+ protected abstract void createGravitinoCatalogByFlinkSql(String catalogName);
+
+ protected abstract String getPaimonCatalogName();
+
+ protected abstract Map<String, String> getPaimonCatalogOptions();
+
+ @BeforeAll
+ void paimonSetup() {
+ initPaimonCatalog();
+ }
+
+ @AfterAll
+ void paimonStop() {
+ Preconditions.checkNotNull(metalake);
+ metalake.dropCatalog(getPaimonCatalogName(), true);
+ }
+
+ protected abstract String getWarehouse();
+
@Test
public void testCreateGravitinoPaimonCatalogUsingSQL() {
tableEnv.useCatalog(DEFAULT_CATALOG);
int numCatalogs = tableEnv.listCatalogs().length;
- String catalogName = "gravitino_hive_sql";
- String warehouse = warehouseDir.toString();
- tableEnv.executeSql(
- String.format(
- "create catalog %s with ("
- + "'type'='gravitino-paimon', "
- + "'warehouse'='%s',"
- + "'metastore'='filesystem'"
- + ")",
- catalogName, warehouse));
+ String catalogName = "gravitino_paimon_catalog";
+ createGravitinoCatalogByFlinkSql(catalogName);
String[] catalogs = tableEnv.listCatalogs();
Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a
new catalog");
Assertions.assertTrue(metalake.catalogExists(catalogName));
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals(warehouse, properties.get("warehouse"));
- }
-
- @Override
- protected Map<String, String> getCreateSchemaProps(String schemaName) {
- return null;
+ Assertions.assertEquals(getWarehouse(), properties.get("warehouse"));
+ tableEnv.executeSql("drop catalog " + catalogName);
+ Assertions.assertFalse(metalake.catalogExists(catalogName));
+ Assertions.assertEquals(
+ numCatalogs, tableEnv.listCatalogs().length, "The created catalog
should be dropped.");
}
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
new file mode 100644
index 0000000000..9025baeb5f
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+public class FlinkPaimonHiveBackendIT extends FlinkPaimonCatalogIT {
+
+ private static final String DEFAULT_PAIMON_CATALOG =
"test_flink_paimon_hive_catalog";
+
+ @Override
+ protected void createGravitinoCatalogByFlinkSql(String catalogName) {
+ tableEnv.executeSql(
+ String.format(
+ "create catalog %s with ("
+ + "'type'='gravitino-paimon', "
+ + "'warehouse'='%s',"
+ + "'metastore'='hive',"
+ + "'uri'='%s'"
+ + ")",
+ catalogName, warehouse, hiveMetastoreUri));
+ }
+
+ @Override
+ protected String getPaimonCatalogName() {
+ return DEFAULT_PAIMON_CATALOG;
+ }
+
+ @Override
+ protected Map<String, String> getPaimonCatalogOptions() {
+ return ImmutableMap.of(
+ PaimonConstants.CATALOG_BACKEND, "hive", "warehouse", warehouse,
"uri", hiveMetastoreUri);
+ }
+
+ @Override
+ protected String getWarehouse() {
+ return warehouse;
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
new file mode 100644
index 0000000000..a84a240336
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.file.Path;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.io.TempDir;
+
+@Tag("gravitino-docker-test")
+public class FlinkPaimonLocalFileSystemBackendIT extends FlinkPaimonCatalogIT {
+
+ @TempDir private static Path warehouseDir;
+
+ private static final String DEFAULT_PAIMON_CATALOG =
+ "test_flink_paimon_filesystem_schema_catalog";
+
+ @Override
+ protected void createGravitinoCatalogByFlinkSql(String catalogName) {
+ tableEnv.executeSql(
+ String.format(
+ "create catalog %s with ("
+ + "'type'='gravitino-paimon', "
+ + "'warehouse'='%s',"
+ + "'metastore'='filesystem'"
+ + ")",
+ catalogName, warehouseDir));
+ }
+
+ @Override
+ protected String getPaimonCatalogName() {
+ return DEFAULT_PAIMON_CATALOG;
+ }
+
+ @Override
+ protected Map<String, String> getPaimonCatalogOptions() {
+ return ImmutableMap.of(
+ PaimonConstants.CATALOG_BACKEND, "filesystem", "warehouse",
warehouseDir.toString());
+ }
+
+ @Override
+ protected String getWarehouse() {
+ return warehouseDir.toString();
+ }
+}