This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 01ec8c48cd [#3515] feat(flink-connector): Support flink iceberg
catalog (#6295)
01ec8c48cd is described below
commit 01ec8c48cd36d6ae5a766efe390c697463ac081d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 16 16:45:24 2025 +0800
[#3515] feat(flink-connector): Support flink iceberg catalog (#6295)
### What changes were proposed in this pull request?
Support flink iceberg catalog
### Why are the changes needed?
Fix: [#3515](https://github.com/apache/gravitino/issues/3515)
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
FlinkIcebergCatalogIT
FlinkIcebergHiveCatalogIT
Co-authored-by: Xiaojian Sun <[email protected]>
---
docs/flink-connector/flink-catalog-iceberg.md | 78 +++++
flink-connector/flink/build.gradle.kts | 6 +
.../connector/catalog/GravitinoCatalogManager.java | 11 -
.../connector/iceberg/GravitinoIcebergCatalog.java | 67 +++++
.../iceberg/GravitinoIcebergCatalogFactory.java | 95 ++++++
.../GravitinoIcebergCatalogFactoryOptions.java | 33 +++
.../iceberg/IcebergPropertiesConstants.java | 49 ++++
.../iceberg/IcebergPropertiesConverter.java | 84 ++++++
.../org.apache.flink.table.factories.Factory | 3 +-
.../iceberg/TestIcebergPropertiesConverter.java | 82 ++++++
.../connector/integration/test/FlinkCommonIT.java | 62 ++--
.../connector/integration/test/FlinkEnvIT.java | 37 ++-
.../integration/test/hive/FlinkHiveCatalogIT.java | 42 +--
.../FlinkIcebergCatalogIT.java} | 325 +++++++--------------
.../test/iceberg/FlinkIcebergHiveCatalogIT.java | 46 +++
.../test/paimon/FlinkPaimonCatalogIT.java | 21 +-
.../integration/test/utils/TestUtils.java | 3 +-
17 files changed, 757 insertions(+), 287 deletions(-)
diff --git a/docs/flink-connector/flink-catalog-iceberg.md
b/docs/flink-connector/flink-catalog-iceberg.md
new file mode 100644
index 0000000000..54d7c0879f
--- /dev/null
+++ b/docs/flink-connector/flink-catalog-iceberg.md
@@ -0,0 +1,78 @@
+---
+title: "Flink connector Iceberg catalog"
+slug: /flink-connector/flink-catalog-iceberg
+keyword: flink connector iceberg catalog
+license: "This software is licensed under the Apache License version 2."
+---
+
+The Apache Gravitino Flink connector can be used to read and write Iceberg
tables, with the metadata managed by the Gravitino server.
+To enable the Flink connector, you must download the Iceberg Flink runtime JAR
and place it in the Flink classpath.
+
+## Capabilities
+
+#### Supported DML and DDL operations:
+
+- `CREATE CATALOG`
+- `CREATE DATABASE`
+- `CREATE TABLE`
+- `DROP TABLE`
+- `ALTER TABLE`
+- `INSERT INTO & OVERWRITE`
+- `SELECT`
+
+#### Operations not supported:
+
+- Partition operations
+- View operations
+- Metadata tables, like:
+ - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots`
+- Query UDF
+- `UPDATE` clause
+- `DELETE` clause
+- `CREATE TABLE LIKE` clause
+
+## SQL example
+```sql
+
+-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino
+
+USE iceberg_a;
+
+CREATE DATABASE IF NOT EXISTS mydatabase;
+USE mydatabase;
+
+CREATE TABLE sample (
+ id BIGINT COMMENT 'unique id',
+ data STRING NOT NULL
+) PARTITIONED BY (data)
+WITH ('format-version'='2');
+
+INSERT INTO sample
+VALUES (1, 'A'), (2, 'B');
+
+SELECT * FROM sample WHERE data = 'B';
+
+```
+
+## Catalog properties
+
+The Gravitino Flink connector transforms the following properties in a catalog
to Flink connector configuration.
+
+
+| Gravitino catalog property name | Flink Iceberg connector configuration |
Description
| Since Version |
+|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
+| `catalog-backend` | `catalog-type` |
Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and
`Rest` in Continuous Validation
| 0.8.0-incubating |
+| `uri` | `uri` |
Catalog backend URI
| 0.8.0-incubating |
+| `warehouse` | `warehouse` |
Catalog backend warehouse
| 0.8.0-incubating |
+| `io-impl` | `io-impl` |
The IO implementation for `FileIO` in Iceberg.
| 0.8.0-incubating |
+| `oss-endpoint` | `oss.endpoint` |
The endpoint of Aliyun OSS service.
| 0.8.0-incubating |
+| `oss-access-key-id` | `client.access-key-id` |
The static access key ID used to access OSS data.
| 0.8.0-incubating |
+| `oss-secret-access-key` | `client.access-key-secret` |
The static secret access key used to access OSS data.
| 0.8.0-incubating |
+
+Gravitino catalog property names with the prefix `flink.bypass.` are passed to
Flink iceberg connector. For example, using `flink.bypass.clients` to pass the
`clients` to the Flink iceberg connector.
+
+## Storage
+
+### OSS
+
+Additionally, you need download the [Aliyun OSS
SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and
copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to
the Flink classpath.
diff --git a/flink-connector/flink/build.gradle.kts
b/flink-connector/flink/build.gradle.kts
index f137a3eae1..4c9bd036ae 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -30,6 +30,8 @@ var paimonVersion: String = libs.versions.paimon.get()
val flinkVersion: String = libs.versions.flink.get()
val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")
+val icebergVersion: String = libs.versions.iceberg.get()
+
// The Flink only support scala 2.12, and all scala api will be removed in a
future version.
// You can find more detail at the following issues:
// https://issues.apache.org/jira/browse/FLINK-23986,
@@ -44,6 +46,8 @@ dependencies {
implementation(libs.guava)
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
+
+
compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
@@ -88,7 +92,9 @@ dependencies {
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
+ testImplementation(libs.metrics.core)
+
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion")
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index 7693e5d4c9..0b0b89f3a5 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -35,12 +35,10 @@ public class GravitinoCatalogManager {
private static GravitinoCatalogManager gravitinoCatalogManager;
private volatile boolean isClosed = false;
- private final String metalakeName;
private final GravitinoMetalake metalake;
private final GravitinoAdminClient gravitinoClient;
private GravitinoCatalogManager(String gravitinoUri, String metalakeName) {
- this.metalakeName = metalakeName;
this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build();
this.metalake = gravitinoClient.loadMetalake(metalakeName);
}
@@ -99,15 +97,6 @@ public class GravitinoCatalogManager {
return catalog;
}
- /**
- * Get the metalake.
- *
- * @return the metalake name.
- */
- public String getMetalakeName() {
- return metalakeName;
- }
-
/**
* Create catalog in Gravitino.
*
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
new file mode 100644
index 0000000000..30fac96bbc
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.iceberg;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
+import org.apache.iceberg.flink.FlinkCatalog;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+/** Gravitino Iceberg Catalog. */
+public class GravitinoIcebergCatalog extends BaseCatalog {
+
+ private final FlinkCatalog icebergCatalog;
+
+ protected GravitinoIcebergCatalog(
+ String catalogName,
+ String defaultDatabase,
+ PropertiesConverter propertiesConverter,
+ PartitionConverter partitionConverter,
+ Map<String, String> properties) {
+ super(catalogName, defaultDatabase, propertiesConverter,
partitionConverter);
+ FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
+ this.icebergCatalog = (FlinkCatalog)
flinkCatalogFactory.createCatalog(catalogName, properties);
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ icebergCatalog.open();
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ icebergCatalog.close();
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return icebergCatalog.getFactory();
+ }
+
+ @Override
+ protected AbstractCatalog realCatalog() {
+ return icebergCatalog;
+ }
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
new file mode 100644
index 0000000000..ad0363d986
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.iceberg;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
+import org.apache.gravitino.flink.connector.utils.FactoryUtils;
+
+public class GravitinoIcebergCatalogFactory implements BaseCatalogFactory {
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtils.createCatalogFactoryHelper(this, context);
+ return new GravitinoIcebergCatalog(
+ context.getName(),
+
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
+ propertiesConverter(),
+ partitionConverter(),
+ context.getOptions());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ /**
+ * Define gravitino catalog provider.
+ *
+ * @return
+ */
+ @Override
+ public String gravitinoCatalogProvider() {
+ return "lakehouse-iceberg";
+ }
+
+ /**
+ * Define gravitino catalog type.
+ *
+ * @return
+ */
+ @Override
+ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
+ return org.apache.gravitino.Catalog.Type.RELATIONAL;
+ }
+
+ /**
+ * Define properties converter.
+ *
+ * @return
+ */
+ @Override
+ public PropertiesConverter propertiesConverter() {
+ return IcebergPropertiesConverter.INSTANCE;
+ }
+
+ @Override
+ public PartitionConverter partitionConverter() {
+ return DefaultPartitionConverter.INSTANCE;
+ }
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
new file mode 100644
index 0000000000..95e1a21de8
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+public class GravitinoIcebergCatalogFactoryOptions {
+
+ public static final String IDENTIFIER = "gravitino-iceberg";
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE)
+ .stringType()
+ .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME);
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
new file mode 100644
index 0000000000..163cfac882
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+public class IcebergPropertiesConstants {
+ @VisibleForTesting
+ public static String GRAVITINO_ICEBERG_CATALOG_BACKEND =
IcebergConstants.CATALOG_BACKEND;
+
+ public static final String ICEBERG_CATALOG_TYPE =
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE;
+
+ public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE =
IcebergConstants.WAREHOUSE;
+
+ public static final String ICEBERG_CATALOG_WAREHOUSE =
CatalogProperties.WAREHOUSE_LOCATION;
+
+ public static final String GRAVITINO_ICEBERG_CATALOG_URI =
IcebergConstants.URI;
+
+ public static final String ICEBERG_CATALOG_URI = CatalogProperties.URI;
+
+ @VisibleForTesting
+ public static String ICEBERG_CATALOG_BACKEND_HIVE =
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
+
+ public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
+
+ @VisibleForTesting
+ public static final String ICEBERG_CATALOG_BACKEND_REST =
CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
new file mode 100644
index 0000000000..7684d3eadb
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+
+public class IcebergPropertiesConverter implements PropertiesConverter {
+ public static IcebergPropertiesConverter INSTANCE = new
IcebergPropertiesConverter();
+
+ private IcebergPropertiesConverter() {}
+
+ private static final Map<String, String> GRAVITINO_CONFIG_TO_FLINK_ICEBERG =
+ ImmutableMap.of(
+ IcebergConstants.CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);
+
+ @Override
+ public Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
+ Preconditions.checkArgument(
+ gravitinoProperties != null, "Iceberg Catalog properties should not be
null.");
+
+ Map<String, String> all = new HashMap<>();
+ if (gravitinoProperties != null) {
+ gravitinoProperties.forEach(
+ (k, v) -> {
+ if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
+ String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
+ all.put(newKey, v);
+ }
+ });
+ }
+ Map<String, String> transformedProperties =
+ IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
+
+ if (transformedProperties != null) {
+ all.putAll(transformedProperties);
+ }
+ all.put(
+ CommonCatalogOptions.CATALOG_TYPE.key(),
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
+ // Map "catalog-backend" to "catalog-type".
+ // TODO If catalog backend is CUSTOM, we need special compatibility logic.
+ GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
+ (key, value) -> {
+ if (all.containsKey(key)) {
+ String config = all.remove(key);
+ all.put(value, config);
+ }
+ });
+ return all;
+ }
+
+ @Override
+ public Map<String, String> toGravitinoTableProperties(Map<String, String>
properties) {
+ return new HashMap<>(properties);
+ }
+
+ @Override
+ public Map<String, String> toFlinkTableProperties(Map<String, String>
properties) {
+ return new HashMap<>(properties);
+ }
+}
diff --git
a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a535afb6dc..45ff2512e7 100644
---
a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -19,4 +19,5 @@
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory
-org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory
\ No newline at end of file
+org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory
+org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
new file mode 100644
index 0000000000..d6de522f39
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergPropertiesConverter {
+ private static final IcebergPropertiesConverter CONVERTER =
IcebergPropertiesConverter.INSTANCE;
+
+ @Test
+ void testCatalogPropertiesWithHiveBackend() {
+ Map<String, String> properties =
+ CONVERTER.toFlinkCatalogProperties(
+ ImmutableMap.of(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+ "hive-uri",
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+ "hive-warehouse",
+ "key1",
+ "value1"));
+ Assertions.assertEquals(
+ ImmutableMap.of(
+ CommonCatalogOptions.CATALOG_TYPE.key(),
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+ "hive-uri",
+ IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+ "hive-warehouse"),
+ properties);
+ }
+
+ @Test
+ void testCatalogPropertiesWithRestBackend() {
+ Map<String, String> properties =
+ CONVERTER.toFlinkCatalogProperties(
+ ImmutableMap.of(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+ "rest-uri",
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+ "rest-warehouse",
+ "key1",
+ "value1"));
+ Assertions.assertEquals(
+ ImmutableMap.of(
+ CommonCatalogOptions.CATALOG_TYPE.key(),
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+ "rest-uri",
+ IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+ "rest-warehouse"),
+ properties);
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index 5a363e4e51..b45e5f46ec 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -72,6 +72,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
return true;
}
+ protected boolean supportGetSchemaWithoutCommentAndOption() {
+ return true;
+ }
+
+ protected abstract String getProvider();
+
+ protected abstract boolean supportDropCascade();
+
@Test
public void testCreateSchema() {
doWithCatalog(
@@ -83,13 +91,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS);
catalog.asSchemas().schemaExists(schema);
} finally {
- catalog.asSchemas().dropSchema(schema, true);
+ catalog.asSchemas().dropSchema(schema, supportDropCascade());
Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
}
});
}
@Test
+ @EnabledIf("supportGetSchemaWithoutCommentAndOption")
public void testGetSchemaWithoutCommentAndOption() {
doWithCatalog(
currentCatalog(),
@@ -134,12 +143,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Schema loadedSchema = catalog.asSchemas().loadSchema(schema);
Assertions.assertEquals(schema, loadedSchema.name());
Assertions.assertEquals(comment, loadedSchema.comment());
- Assertions.assertEquals(2, loadedSchema.properties().size());
Assertions.assertEquals(propertyValue,
loadedSchema.properties().get(propertyKey));
Assertions.assertEquals(
location,
loadedSchema.properties().get(HiveConstants.LOCATION));
} finally {
- catalog.asSchemas().dropSchema(schema, true);
+ catalog.asSchemas().dropSchema(schema, supportDropCascade());
Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
}
});
@@ -177,9 +185,9 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Assertions.assertEquals(schema2, schemas[2]);
Assertions.assertEquals(schema3, schemas[3]);
} finally {
- catalog.asSchemas().dropSchema(schema, true);
- catalog.asSchemas().dropSchema(schema2, true);
- catalog.asSchemas().dropSchema(schema3, true);
+ catalog.asSchemas().dropSchema(schema, supportDropCascade());
+ catalog.asSchemas().dropSchema(schema2, supportDropCascade());
+ catalog.asSchemas().dropSchema(schema3, supportDropCascade());
Assertions.assertEquals(1,
catalog.asSchemas().listSchemas().length);
}
});
@@ -204,7 +212,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Schema loadedSchema = catalog.asSchemas().loadSchema(schema);
Assertions.assertEquals(schema, loadedSchema.name());
Assertions.assertEquals("test comment", loadedSchema.comment());
- Assertions.assertEquals(3, loadedSchema.properties().size());
Assertions.assertEquals("value1",
loadedSchema.properties().get("key1"));
Assertions.assertEquals("value2",
loadedSchema.properties().get("key2"));
Assertions.assertNotNull(loadedSchema.properties().get("location"));
@@ -215,11 +222,10 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Schema reloadedSchema = catalog.asSchemas().loadSchema(schema);
Assertions.assertEquals(schema, reloadedSchema.name());
Assertions.assertEquals("test comment", reloadedSchema.comment());
- Assertions.assertEquals(4, reloadedSchema.properties().size());
Assertions.assertEquals("new-value",
reloadedSchema.properties().get("key1"));
Assertions.assertEquals("value3",
reloadedSchema.properties().get("key3"));
} finally {
- catalog.asSchemas().dropSchema(schema, true);
+ catalog.asSchemas().dropSchema(schema, supportDropCascade());
}
});
}
@@ -270,7 +276,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Row.of("A", 1.0),
Row.of("B", 2.0));
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -303,7 +310,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Row.of("test_table1"),
Row.of("test_table2"));
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -320,12 +328,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
NameIdentifier identifier = NameIdentifier.of(databaseName,
tableName);
catalog.asTableCatalog().createTable(identifier, columns,
"comment1", ImmutableMap.of());
Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier));
-
- TableResult result = sql("DROP TABLE %s", tableName);
- TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ sql("DROP TABLE IF EXISTS %s", tableName);
Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier));
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -379,7 +386,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
fail(e);
}
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -415,7 +423,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
};
assertColumns(expected, actual);
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -466,6 +475,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
.asTableCatalog()
.loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertEquals(newComment, gravitinoTable.comment());
+
} catch (DatabaseNotExistException
| TableAlreadyExistException
| TableNotExistException e) {
@@ -475,7 +485,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
fail("Catalog doesn't exist");
}
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -511,7 +522,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
};
assertColumns(expected, actual);
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -542,7 +554,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
new Column[] {Column.of("order_amount", Types.IntegerType.get(),
"ORDER_AMOUNT")};
assertColumns(expected, actual);
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -584,7 +597,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
};
assertColumns(expected, actual);
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -612,7 +626,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Assertions.assertTrue(
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName,
newTableName)));
},
- true);
+ true,
+ supportDropCascade());
}
@Test
@@ -655,6 +670,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Assertions.assertEquals("value1", properties.get("key"));
Assertions.assertNull(properties.get("key2"));
},
- true);
+ true,
+ supportDropCascade());
}
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index f56b5297e1..959123f336 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -19,19 +19,25 @@
package org.apache.gravitino.flink.connector.integration.test;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.HiveContainer;
@@ -154,31 +160,56 @@ public abstract class FlinkEnvIT extends BaseIT {
}
@FormatMethod
- protected TableResult sql(@FormatString String sql, Object... args) {
+ protected static TableResult sql(@FormatString String sql, Object... args) {
return tableEnv.executeSql(String.format(sql, args));
}
protected void doWithSchema(
Catalog catalog, String schemaName, Consumer<Catalog> action, boolean
dropSchema) {
+ doWithSchema(catalog, schemaName, action, dropSchema, true);
+ }
+
+ protected void doWithSchema(
+ Catalog catalog,
+ String schemaName,
+ Consumer<Catalog> action,
+ boolean dropSchema,
+ boolean cascade) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
- catalog.asSchemas().createSchema(schemaName, null, null);
+ catalog.asSchemas().createSchema(schemaName, null,
getCreateSchemaProps(schemaName));
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
} finally {
if (dropSchema) {
- catalog.asSchemas().dropSchema(schemaName, true);
+ clearTableInSchema();
+ catalog.asSchemas().dropSchema(schemaName, cascade);
}
}
}
+ protected Map<String, String> getCreateSchemaProps(String schemaName) {
+ return null;
+ }
+
protected static void doWithCatalog(Catalog catalog, Consumer<Catalog>
action) {
Preconditions.checkNotNull(catalog);
tableEnv.useCatalog(catalog.name());
action.accept(catalog);
}
+
+ /** Iceberg requires deleting the table first, then deleting the schema. */
+ protected static void clearTableInSchema() {
+ TableResult result = sql("SHOW TABLES");
+ List<Row> rows = Lists.newArrayList(result.collect());
+ for (Row row : rows) {
+ String tableName = row.getField(0).toString();
+ TableResult deleteResult = sql("DROP TABLE IF EXISTS %s", tableName);
+ TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS);
+ }
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index bb7b25f6b2..7792068e24 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
@@ -73,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
private static org.apache.gravitino.Catalog hiveCatalog;
@BeforeAll
- static void hiveStartUp() {
+ void hiveStartUp() {
initDefaultHiveCatalog();
}
@@ -83,13 +82,13 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true);
}
- protected static void initDefaultHiveCatalog() {
+ protected void initDefaultHiveCatalog() {
Preconditions.checkNotNull(metalake);
hiveCatalog =
metalake.createCatalog(
DEFAULT_HIVE_CATALOG,
org.apache.gravitino.Catalog.Type.RELATIONAL,
- "hive",
+ getProvider(),
null,
ImmutableMap.of("metastore.uris", hiveMetastoreUri));
}
@@ -583,32 +582,23 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
true);
}
+ @Override
+ protected Map<String, String> getCreateSchemaProps(String schemaName) {
+ return ImmutableMap.of("location", warehouse + "/" + schemaName);
+ }
+
@Override
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
}
- protected void doWithSchema(
- org.apache.gravitino.Catalog catalog,
- String schemaName,
- Consumer<org.apache.gravitino.Catalog> action,
- boolean dropSchema) {
- Preconditions.checkNotNull(catalog);
- Preconditions.checkNotNull(schemaName);
- try {
- tableEnv.useCatalog(catalog.name());
- if (!catalog.asSchemas().schemaExists(schemaName)) {
- catalog
- .asSchemas()
- .createSchema(
- schemaName, null, ImmutableMap.of("location", warehouse + "/"
+ schemaName));
- }
- tableEnv.useDatabase(schemaName);
- action.accept(catalog);
- } finally {
- if (dropSchema) {
- catalog.asSchemas().dropSchema(schemaName, true);
- }
- }
+ @Override
+ protected String getProvider() {
+ return "hive";
+ }
+
+ @Override
+ protected boolean supportDropCascade() {
+ return true;
}
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
similarity index 61%
copy from
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
copy to
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
index bb7b25f6b2..0834def90b 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.flink.connector.integration.test.hive;
+
+package org.apache.gravitino.flink.connector.integration.test.iceberg;
import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
@@ -25,18 +26,13 @@ import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_T
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CatalogTable;
@@ -44,13 +40,12 @@ import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.types.Row;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
-import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
-import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog;
+import
org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.rel.Column;
@@ -58,54 +53,41 @@ import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.types.Types;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-@Tag("gravitino-docker-test")
-public class FlinkHiveCatalogIT extends FlinkCommonIT {
- private static final String DEFAULT_HIVE_CATALOG =
"test_flink_hive_schema_catalog";
+public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {
- private static org.apache.gravitino.Catalog hiveCatalog;
+ private static final String DEFAULT_ICEBERG_CATALOG =
"flink_iceberg_catalog";
- @BeforeAll
- static void hiveStartUp() {
- initDefaultHiveCatalog();
- }
-
- @AfterAll
- static void hiveStop() {
- Preconditions.checkNotNull(metalake);
- metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true);
- }
+ private static org.apache.gravitino.Catalog icebergCatalog;
- protected static void initDefaultHiveCatalog() {
+ @BeforeAll
+ public void before() {
Preconditions.checkNotNull(metalake);
- hiveCatalog =
+ icebergCatalog =
metalake.createCatalog(
- DEFAULT_HIVE_CATALOG,
+ DEFAULT_ICEBERG_CATALOG,
org.apache.gravitino.Catalog.Type.RELATIONAL,
- "hive",
+ getProvider(),
null,
- ImmutableMap.of("metastore.uris", hiveMetastoreUri));
+ getCatalogConfigs());
}
+ protected abstract Map<String, String> getCatalogConfigs();
+
@Test
- public void testCreateGravitinoHiveCatalog() {
+ public void testCreateGravitinoIcebergCatalog() {
tableEnv.useCatalog(DEFAULT_CATALOG);
int numCatalogs = tableEnv.listCatalogs().length;
// Create a new catalog.
- String catalogName = "gravitino_hive";
- Configuration configuration = new Configuration();
+ String catalogName = "gravitino_iceberg_catalog";
+ Configuration configuration = Configuration.fromMap(getCatalogConfigs());
configuration.set(
- CommonCatalogOptions.CATALOG_TYPE,
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
- configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR,
"src/test/resources/flink-tests");
- configuration.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS,
hiveMetastoreUri);
+ CommonCatalogOptions.CATALOG_TYPE,
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
+
CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName,
configuration);
tableEnv.createCatalog(catalogName, catalogDescriptor);
Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -113,23 +95,12 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
// Check the catalog properties.
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
- Map<String, String> flinkProperties =
- gravitinoCatalog.properties().entrySet().stream()
- .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- Assertions.assertEquals(2, flinkProperties.size());
- Assertions.assertEquals(
- "src/test/resources/flink-tests",
-
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
- Assertions.assertEquals(
- GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
-
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+ Assertions.assertEquals(hiveMetastoreUri,
properties.get(IcebergConstants.URI));
// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog =
tableEnv.getCatalog(catalogName);
Assertions.assertTrue(catalog.isPresent());
- Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+ Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get());
// List catalogs.
String[] catalogs = tableEnv.listCatalogs();
@@ -154,52 +125,46 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
tableEnv.executeSql("drop catalog " + catalogName);
Assertions.assertFalse(metalake.catalogExists(catalogName));
- Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+ Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog =
+ tableEnv.getCatalog(catalogName);
Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be
dropped");
}
@Test
- public void testCreateGravitinoHiveCatalogUsingSQL() {
+ public void testCreateGravitinoIcebergUsingSQL() {
tableEnv.useCatalog(DEFAULT_CATALOG);
int numCatalogs = tableEnv.listCatalogs().length;
// Create a new catalog.
- String catalogName = "gravitino_hive_sql";
+ String catalogName = "gravitino_iceberg_using_sql";
tableEnv.executeSql(
String.format(
"create catalog %s with ("
- + "'type'='gravitino-hive', "
- + "'hive-conf-dir'='src/test/resources/flink-tests',"
- + "'hive.metastore.uris'='%s',"
- + "'unknown.key'='unknown.value'"
+ + "'type'='%s', "
+ + "'catalog-backend'='%s',"
+ + "'uri'='%s',"
+ + "'warehouse'='%s'"
+ ")",
- catalogName, hiveMetastoreUri));
+ catalogName,
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+ getCatalogBackend(),
+ hiveMetastoreUri,
+ warehouse));
Assertions.assertTrue(metalake.catalogExists(catalogName));
// Check the properties of the created catalog.
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
- Map<String, String> flinkProperties =
- properties.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- Assertions.assertEquals(3, flinkProperties.size());
- Assertions.assertEquals(
- "src/test/resources/flink-tests",
-
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
- Assertions.assertEquals(
- GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
-
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+ Assertions.assertEquals(hiveMetastoreUri,
properties.get(IcebergConstants.URI));
+
Assertions.assertEquals(
- "unknown.value",
- flinkProperties.get(flinkByPass("unknown.key")),
- "The unknown.key will not cause failure and will be saved in
Gravitino.");
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+ properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog =
tableEnv.getCatalog(catalogName);
Assertions.assertTrue(catalog.isPresent());
- Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+ Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get());
// List catalogs.
String[] catalogs = tableEnv.listCatalogs();
@@ -229,52 +194,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
tableEnv.executeSql("drop catalog " + catalogName);
Assertions.assertFalse(metalake.catalogExists(catalogName));
- Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+ Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog =
+ tableEnv.getCatalog(catalogName);
Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be
dropped");
}
- @Test
- public void testCreateGravitinoHiveCatalogRequireOptions() {
- tableEnv.useCatalog(DEFAULT_CATALOG);
-
- // Failed to create the catalog for missing the required options.
- String catalogName = "gravitino_hive_sql2";
- Assertions.assertThrows(
- ValidationException.class,
- () -> {
- tableEnv.executeSql(
- String.format(
- "create catalog %s with ("
- + "'type'='gravitino-hive', "
- + "'hive-conf-dir'='src/test/resources/flink-tests'"
- + ")",
- catalogName));
- },
- "The hive.metastore.uris is required.");
-
- Assertions.assertFalse(metalake.catalogExists(catalogName));
- }
-
@Test
public void testGetCatalogFromGravitino() {
// list catalogs.
int numCatalogs = tableEnv.listCatalogs().length;
// create a new catalog.
- String catalogName = "hive_catalog_in_gravitino";
+ String catalogName = "iceberg_catalog_in_gravitino";
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.createCatalog(
catalogName,
org.apache.gravitino.Catalog.Type.RELATIONAL,
- "hive",
+ getProvider(),
null,
- ImmutableMap.of(
- "flink.bypass.hive-conf-dir",
- "src/test/resources/flink-tests",
- "flink.bypass.hive.test",
- "hive.config",
- "metastore.uris",
- hiveMetastoreUri));
+ getCatalogConfigs());
Assertions.assertNotNull(gravitinoCatalog);
Assertions.assertEquals(catalogName, gravitinoCatalog.name());
Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -282,15 +220,10 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
numCatalogs + 1, tableEnv.listCatalogs().length, "Should create a new
catalog");
// get the catalog from Gravitino.
- Optional<Catalog> flinkHiveCatalog = tableEnv.getCatalog(catalogName);
- Assertions.assertTrue(flinkHiveCatalog.isPresent());
- Assertions.assertInstanceOf(GravitinoHiveCatalog.class,
flinkHiveCatalog.get());
- GravitinoHiveCatalog gravitinoHiveCatalog = (GravitinoHiveCatalog)
flinkHiveCatalog.get();
- HiveConf hiveConf = gravitinoHiveCatalog.getHiveConf();
- Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf");
- Assertions.assertEquals("hive.config", hiveConf.get("hive.test"));
- Assertions.assertEquals(
- hiveMetastoreUri,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+ Optional<org.apache.flink.table.catalog.Catalog> flinkIcebergCatalog =
+ tableEnv.getCatalog(catalogName);
+ Assertions.assertTrue(flinkIcebergCatalog.isPresent());
+ Assertions.assertInstanceOf(GravitinoIcebergCatalog.class,
flinkIcebergCatalog.get());
// drop the catalog.
tableEnv.useCatalog(DEFAULT_CATALOG);
@@ -301,123 +234,99 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
}
@Test
- public void testHivePartitionTable() {
- String databaseName = "test_create_hive_partition_table_db";
- String tableName = "test_create_hive_partition_table";
- String comment = "test comment";
+ public void testIcebergTableWithPartition() {
+ String databaseName = "test_iceberg_table_partition";
+ String tableName = "iceberg_table_with_partition";
String key = "test key";
String value = "test value";
doWithSchema(
- currentCatalog(),
+ icebergCatalog,
databaseName,
catalog -> {
TableResult result =
sql(
- "CREATE TABLE %s "
- + "(string_type STRING COMMENT 'string_type', "
- + " double_type DOUBLE COMMENT 'double_type')"
- + " COMMENT '%s' "
- + " PARTITIONED BY (string_type, double_type)"
+ "CREATE TABLE %s ("
+ + " id BIGINT COMMENT 'unique id',"
+ + " data STRING NOT NULL"
+ + " ) PARTITIONED BY (data)"
+ " WITH ("
+ "'%s' = '%s')",
- tableName, comment, key, value);
+ tableName, key, value);
TestUtils.assertTableResult(result, ResultKind.SUCCESS);
Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertNotNull(table);
- Assertions.assertEquals(comment, table.comment());
Assertions.assertEquals(value, table.properties().get(key));
Column[] columns =
new Column[] {
- Column.of("string_type", Types.StringType.get(),
"string_type", true, false, null),
- Column.of("double_type", Types.DoubleType.get(), "double_type")
+ Column.of("id", Types.LongType.get(), "unique id", true,
false, null),
+ Column.of("data", Types.StringType.get(), null, false, false,
null)
};
assertColumns(columns, table.columns());
- Transform[] partitions =
- new Transform[] {
- Transforms.identity("string_type"),
Transforms.identity("double_type")
- };
+ Transform[] partitions = new Transform[]
{Transforms.identity("data")};
Assertions.assertArrayEquals(partitions, table.partitioning());
// load flink catalog
try {
- Catalog flinkCatalog =
tableEnv.getCatalog(currentCatalog().name()).get();
+ org.apache.flink.table.catalog.Catalog flinkCatalog =
+ tableEnv.getCatalog(currentCatalog().name()).get();
CatalogBaseTable flinkTable =
flinkCatalog.getTable(ObjectPath.fromString(databaseName + "."
+ tableName));
DefaultCatalogTable catalogTable = (DefaultCatalogTable)
flinkTable;
Assertions.assertTrue(catalogTable.isPartitioned());
Assertions.assertArrayEquals(
- new String[] {"string_type", "double_type"},
- catalogTable.getPartitionKeys().toArray());
+ new String[] {"data"},
catalogTable.getPartitionKeys().toArray());
} catch (Exception e) {
Assertions.fail("Table should be exist", e);
}
// write and read.
TestUtils.assertTableResult(
- sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName),
+ sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1L));
TestUtils.assertTableResult(
- sql("SELECT * FROM %s ORDER BY double_type", tableName),
+ sql("SELECT * FROM %s ORDER BY data", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
- Row.of("A", 1.0),
- Row.of("B", 2.0));
- try {
- Assertions.assertTrue(
- hdfs.exists(
- new Path(
- table.properties().get("location") +
"/string_type=A/double_type=1.0")));
- Assertions.assertTrue(
- hdfs.exists(
- new Path(
- table.properties().get("location") +
"/string_type=B/double_type=2.0")));
- } catch (IOException e) {
- Assertions.fail("The partition directory should be exist.", e);
- }
+ Row.of(1, "A"),
+ Row.of(2, "B"));
},
- true);
+ true,
+ supportDropCascade());
}
@Test
- public void testCreateHiveTable() {
- String databaseName = "test_create_hive_table_db";
- String tableName = "test_create_hive_table";
- String comment = "test comment";
+ public void testCreateIcebergTable() {
+ String databaseName = "test_create_iceberg_table";
+ String tableName = "iceberg_table";
+ String comment = "test table comment";
String key = "test key";
String value = "test value";
- // 1. The NOT NULL constraint for column is only supported since Hive 3.0,
- // but the current Gravitino Hive catalog only supports Hive 2.x.
- // 2. Hive doesn't support Time and Timestamp with timezone type.
- // 3. Flink SQL only support to create Interval Month and Second(3).
doWithSchema(
- metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+ metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG),
databaseName,
catalog -> {
TableResult result =
sql(
- "CREATE TABLE %s "
- + "(string_type STRING COMMENT 'string_type', "
+ "CREATE TABLE %s ("
+ + " string_type STRING COMMENT 'string_type', "
+ " double_type DOUBLE COMMENT 'double_type',"
+ " int_type INT COMMENT 'int_type',"
+ " varchar_type VARCHAR COMMENT 'varchar_type',"
- + " char_type CHAR COMMENT 'char_type',"
+ " boolean_type BOOLEAN COMMENT 'boolean_type',"
- + " byte_type TINYINT COMMENT 'byte_type',"
+ " binary_type VARBINARY(10) COMMENT 'binary_type',"
+ " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type',"
+ " bigint_type BIGINT COMMENT 'bigint_type',"
+ " float_type FLOAT COMMENT 'float_type',"
+ " date_type DATE COMMENT 'date_type',"
+ " timestamp_type TIMESTAMP COMMENT 'timestamp_type',"
- + " smallint_type SMALLINT COMMENT 'smallint_type',"
+ " array_type ARRAY<INT> COMMENT 'array_type',"
+ " map_type MAP<INT, STRING> COMMENT 'map_type',"
- + " struct_type ROW<k1 INT, k2 String>)"
- + " COMMENT '%s' WITH ("
+ + " struct_type ROW<k1 INT, k2 String>"
+ + " ) COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
TestUtils.assertTableResult(result, ResultKind.SUCCESS);
@@ -433,9 +342,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Column.of("double_type", Types.DoubleType.get(),
"double_type"),
Column.of("int_type", Types.IntegerType.get(), "int_type"),
Column.of("varchar_type", Types.StringType.get(),
"varchar_type"),
- Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
Column.of("boolean_type", Types.BooleanType.get(),
"boolean_type"),
- Column.of("byte_type", Types.ByteType.get(), "byte_type"),
Column.of("binary_type", Types.BinaryType.get(),
"binary_type"),
Column.of("decimal_type", Types.DecimalType.of(10, 2),
"decimal_type"),
Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
@@ -443,7 +350,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Column.of("date_type", Types.DateType.get(), "date_type"),
Column.of(
"timestamp_type", Types.TimestampType.withoutTimeZone(),
"timestamp_type"),
- Column.of("smallint_type", Types.ShortType.get(),
"smallint_type"),
Column.of(
"array_type", Types.ListType.of(Types.IntegerType.get(),
true), "array_type"),
Column.of(
@@ -460,28 +366,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
assertColumns(columns, table.columns());
Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
},
- true);
+ true,
+ supportDropCascade());
}
@Test
- public void testGetHiveTable() {
+ public void testGetIcebergTable() {
Column[] columns =
new Column[] {
Column.of("string_type", Types.StringType.get(), "string_type",
true, false, null),
Column.of("double_type", Types.DoubleType.get(), "double_type"),
Column.of("int_type", Types.IntegerType.get(), "int_type"),
Column.of("varchar_type", Types.StringType.get(), "varchar_type"),
- Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"),
- Column.of("byte_type", Types.ByteType.get(), "byte_type"),
Column.of("binary_type", Types.BinaryType.get(), "binary_type"),
Column.of("decimal_type", Types.DecimalType.of(10, 2),
"decimal_type"),
Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
Column.of("float_type", Types.FloatType.get(), "float_type"),
Column.of("date_type", Types.DateType.get(), "date_type"),
Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(),
"timestamp_type"),
- Column.of("smallint_type", Types.ShortType.get(), "smallint_type"),
- Column.of("fixed_char_type", Types.FixedCharType.of(10),
"fixed_char_type"),
Column.of("array_type", Types.ListType.of(Types.IntegerType.get(),
true), "array_type"),
Column.of(
"map_type",
@@ -495,9 +398,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
null)
};
- String databaseName = "test_get_hive_table_db";
+ String databaseName = "test_get_iceberg_table";
doWithSchema(
- metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+ metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG),
databaseName,
catalog -> {
String tableName = "test_desc_table";
@@ -511,7 +414,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
ImmutableMap.of("k1", "v1"));
Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
- tableEnv.getCatalog(DEFAULT_HIVE_CATALOG);
+ tableEnv.getCatalog(DEFAULT_ICEBERG_CATALOG);
Assertions.assertTrue(flinkCatalog.isPresent());
try {
CatalogBaseTable table =
@@ -531,13 +434,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
org.apache.flink.table.catalog.Column.physical(
"varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE))
.withComment("varchar_type"),
- org.apache.flink.table.catalog.Column.physical("char_type",
DataTypes.CHAR(1))
- .withComment("char_type"),
org.apache.flink.table.catalog.Column.physical(
"boolean_type", DataTypes.BOOLEAN())
.withComment("boolean_type"),
- org.apache.flink.table.catalog.Column.physical("byte_type",
DataTypes.TINYINT())
- .withComment("byte_type"),
org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES())
.withComment("binary_type"),
org.apache.flink.table.catalog.Column.physical(
@@ -552,12 +451,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
org.apache.flink.table.catalog.Column.physical(
"timestamp_type", DataTypes.TIMESTAMP())
.withComment("timestamp_type"),
- org.apache.flink.table.catalog.Column.physical(
- "smallint_type", DataTypes.SMALLINT())
- .withComment("smallint_type"),
- org.apache.flink.table.catalog.Column.physical(
- "fixed_char_type", DataTypes.CHAR(10))
- .withComment("fixed_char_type"),
org.apache.flink.table.catalog.Column.physical(
"array_type", DataTypes.ARRAY(DataTypes.INT()))
.withComment("array_type"),
@@ -580,35 +473,29 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Assertions.fail(e);
}
},
- true);
+ true,
+ supportDropCascade());
+ }
+
+ @Override
+ protected Catalog currentCatalog() {
+ return icebergCatalog;
}
@Override
- protected org.apache.gravitino.Catalog currentCatalog() {
- return hiveCatalog;
+ protected String getProvider() {
+ return "lakehouse-iceberg";
}
- protected void doWithSchema(
- org.apache.gravitino.Catalog catalog,
- String schemaName,
- Consumer<org.apache.gravitino.Catalog> action,
- boolean dropSchema) {
- Preconditions.checkNotNull(catalog);
- Preconditions.checkNotNull(schemaName);
- try {
- tableEnv.useCatalog(catalog.name());
- if (!catalog.asSchemas().schemaExists(schemaName)) {
- catalog
- .asSchemas()
- .createSchema(
- schemaName, null, ImmutableMap.of("location", warehouse + "/"
+ schemaName));
- }
- tableEnv.useDatabase(schemaName);
- action.accept(catalog);
- } finally {
- if (dropSchema) {
- catalog.asSchemas().dropSchema(schemaName, true);
- }
- }
+ @Override
+ protected boolean supportGetSchemaWithoutCommentAndOption() {
+ return false;
}
+
+ @Override
+ protected boolean supportDropCascade() {
+ return false;
+ }
+
+ protected abstract String getCatalogBackend();
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
new file mode 100644
index 0000000000..fc21ce2c24
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.integration.test.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT {
+
+ @Override
+ protected Map<String, String> getCatalogConfigs() {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
hiveMetastoreUri);
+ return catalogProperties;
+ }
+
+ protected String getCatalogBackend() {
+ return "hive";
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 57a17c2a11..a03b4a198e 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -47,12 +47,22 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
return false;
}
+ @Override
+ protected String getProvider() {
+ return "lakehouse-paimon";
+ }
+
+ @Override
+ protected boolean supportDropCascade() {
+ return true;
+ }
+
protected Catalog currentCatalog() {
return catalog;
}
@BeforeAll
- static void setup() {
+ void setup() {
initPaimonCatalog();
}
@@ -62,13 +72,13 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true);
}
- private static void initPaimonCatalog() {
+ private void initPaimonCatalog() {
Preconditions.checkNotNull(metalake);
catalog =
metalake.createCatalog(
DEFAULT_PAIMON_CATALOG,
org.apache.gravitino.Catalog.Type.RELATIONAL,
- "lakehouse-paimon",
+ getProvider(),
null,
ImmutableMap.of(
PaimonConstants.CATALOG_BACKEND,
@@ -98,4 +108,9 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
Map<String, String> properties = gravitinoCatalog.properties();
Assertions.assertEquals(warehouse, properties.get("warehouse"));
}
+
+ @Override
+ protected Map<String, String> getCreateSchemaProps(String schemaName) {
+ return null;
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
index ba16a9c07b..02710bcfb3 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
@@ -42,7 +42,8 @@ public class TestUtils {
Row expectedRow = expected[i];
Row actualRow = actualRows.get(i);
Assertions.assertEquals(expectedRow.getKind(), actualRow.getKind());
- Assertions.assertEquals(expectedRow, actualRow);
+ // Only compare string value.
+ Assertions.assertEquals(expectedRow.toString(), actualRow.toString());
}
}
}