This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d685053400 [#9508] Fix Hive SerDe incompatibility for Gravitino Flink
connector created tables (#9590)
d685053400 is described below
commit d6850534002d79622ab8fe186502466f8e33512f
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Tue Jan 27 09:23:18 2026 +0530
[#9508] Fix Hive SerDe incompatibility for Gravitino Flink connector
created tables (#9590)
### What changes were proposed in this pull request?
This PR fixes an interoperability issue between the Gravitino Flink
connector and
the native Flink Hive client.
1. seperate properteis converter interface to catalog properteis
converter and schema&table properteis converter
2. create a hive schema&table properteis converter, to generate
gravitino tables according to the table properties and hive conf to
follow flink behavior
3. transform format, serde, input/output format
4. get serde by following rows:
```
// 1. use the serde lib in the stored-as format
// 2. use the serde lib specified in the properties
// 3. use the serde lib from default file format
// 4. use the default serde in hive conf
// please refer to
org.apache.flink.table.catalog.hive.util.HiveTableUtils for more details
```
5. get input/output format in following order:
```
// 1. use input/output from storage format (STORED AS)
// 2. use input/output from table properties
// 3. use input/output from default file format
```
### Why are the changes needed?
Fix: #9508
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
The patch was tested with both unit and integration tests:
1. **Unit tests**
- Added tests for `HivePropertiesConverter` to verify:
5. **Integration test**
- Added an end-to-end test that creates a Hive table via the Gravitino
Flink
connector and could be read by native flink client.
---------
Co-authored-by: fanng <[email protected]>
---
...verter.java => CatalogPropertiesConverter.java} | 62 +-----
.../SchemaAndTablePropertiesConverter.java | 78 ++++++++
.../flink/connector/catalog/BaseCatalog.java | 18 +-
.../connector/catalog/BaseCatalogFactory.java | 8 +-
.../flink/connector/hive/GravitinoHiveCatalog.java | 11 +-
.../hive/GravitinoHiveCatalogFactory.java | 16 +-
...er.java => HiveCatalogPropertiesConverter.java} | 33 +--
.../HiveSchemaAndTablePropertiesConverter.java | 221 +++++++++++++++++++++
.../connector/iceberg/GravitinoIcebergCatalog.java | 6 +-
.../iceberg/GravitinoIcebergCatalogFactory.java | 16 +-
.../iceberg/IcebergPropertiesConverter.java | 6 +-
.../flink/connector/jdbc/GravitinoJdbcCatalog.java | 6 +-
.../jdbc/GravitinoJdbcCatalogFactory.java | 5 +-
.../connector/jdbc/JdbcPropertiesConverter.java | 10 +-
.../mysql/GravitinoMysqlJdbcCatalogFactory.java | 9 +-
.../GravitinoPostgresJdbcCatalogFactory.java | 9 +-
.../connector/paimon/GravitinoPaimonCatalog.java | 6 +-
.../paimon/GravitinoPaimonCatalogFactory.java | 11 +-
.../paimon/PaimonPropertiesConverter.java | 6 +-
.../connector/store/GravitinoCatalogStore.java | 9 +-
.../flink/connector/catalog/TestBaseCatalog.java | 4 +-
.../hive/TestHivePropertiesConverter.java | 3 +-
.../TestHiveSchemaAndTablePropertiesConverter.java | 131 ++++++++++++
.../connector/integration/test/FlinkEnvIT.java | 4 +-
.../integration/test/hive/FlinkHiveCatalogIT.java | 195 +++++++++++++++++-
.../test/hive/FlinkHiveKerberosClientIT.java | 4 +-
.../paimon/TestPaimonPropertiesConverter.java | 4 +-
27 files changed, 734 insertions(+), 157 deletions(-)
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/CatalogPropertiesConverter.java
similarity index 66%
rename from
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
rename to
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/CatalogPropertiesConverter.java
index 71ea7c8ec2..a8c58f38cc 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/CatalogPropertiesConverter.java
@@ -23,19 +23,18 @@ import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.catalog.ObjectPath;
/**
- * PropertiesConverter is used to convert properties between Flink properties
and Apache Gravitino
- * properties
+ * CatalogPropertiesConverter converts properties between Flink catalog
properties and Apache
+ * Gravitino catalog properties.
*/
-public interface PropertiesConverter {
+public interface CatalogPropertiesConverter {
String FLINK_PROPERTY_PREFIX = "flink.bypass.";
/**
* Converts properties from application provided properties and Flink
connector properties to
- * Gravitino properties.This method processes the Flink configuration and
transforms it into a
+ * Gravitino properties. This method processes the Flink configuration and
transforms it into a
* format suitable for the Gravitino catalog.
*
* @param flinkConf The Flink configuration containing connector properties.
This includes both
@@ -75,8 +74,9 @@ public interface PropertiesConverter {
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
- if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
- flinkConfigKey =
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
+ if
(key.startsWith(CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX)) {
+ flinkConfigKey =
+
key.substring(CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX.length());
allProperties.put(flinkConfigKey, value);
} else {
String convertedKey =
transformPropertyToFlinkCatalog(flinkConfigKey);
@@ -111,54 +111,6 @@ public interface PropertiesConverter {
*/
String transformPropertyToFlinkCatalog(String configKey);
- /**
- * Converts properties from Flink connector schema properties to Gravitino
schema properties.
- *
- * @param flinkProperties The schema properties provided by Flink.
- * @return The schema properties for the Gravitino.
- */
- default Map<String, String> toGravitinoSchemaProperties(Map<String, String>
flinkProperties) {
- return flinkProperties;
- }
-
- /**
- * Converts properties from Gravitino database properties to Flink connector
schema properties.
- *
- * @param gravitinoProperties The schema properties provided by Gravitino.
- * @return The database properties for the Flink connector.
- */
- default Map<String, String> toFlinkDatabaseProperties(Map<String, String>
gravitinoProperties) {
- return gravitinoProperties;
- }
-
- /**
- * Converts properties from Gravitino table properties to Flink connector
table properties.
- *
- * @param flinkCatalogProperties The flinkCatalogProperties are either the
converted properties
- * obtained through the toFlinkCatalogProperties method in
GravitinoCatalogStore, or the
- * options passed when writing a CREATE CATALOG statement in Flink SQL.
- * @param gravitinoTableProperties The table properties provided by
Gravitino.
- * @param tablePath The tablePath provides the database and table for some
catalogs, such as the
- * {@link
org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalog}.
- * @return The table properties for the Flink connector.
- */
- default Map<String, String> toFlinkTableProperties(
- Map<String, String> flinkCatalogProperties,
- Map<String, String> gravitinoTableProperties,
- ObjectPath tablePath) {
- return gravitinoTableProperties;
- }
-
- /**
- * Converts properties from Flink connector table properties to Gravitino
table properties.
- *
- * @param flinkProperties The table properties provided by Flink.
- * @return The table properties for the Gravitino.
- */
- default Map<String, String> toGravitinoTableProperties(Map<String, String>
flinkProperties) {
- return flinkProperties;
- }
-
/**
* Retrieves the Flink catalog type associated with this converter. This
method is used to
* determine the type of Flink catalog that this converter is designed for.
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/SchemaAndTablePropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/SchemaAndTablePropertiesConverter.java
new file mode 100644
index 0000000000..690c7cbb9a
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/SchemaAndTablePropertiesConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector;
+
+import java.util.Map;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * SchemaAndTablePropertiesConverter converts properties between Flink
schema/table properties and
+ * Apache Gravitino schema/table properties.
+ */
+public interface SchemaAndTablePropertiesConverter {
+
+ /**
+ * Converts properties from Flink connector schema properties to Gravitino
schema properties.
+ *
+ * @param flinkProperties The schema properties provided by Flink.
+ * @return The schema properties for the Gravitino.
+ */
+ default Map<String, String> toGravitinoSchemaProperties(Map<String, String>
flinkProperties) {
+ return flinkProperties;
+ }
+
+ /**
+ * Converts properties from Gravitino database properties to Flink connector
schema properties.
+ *
+ * @param gravitinoProperties The schema properties provided by Gravitino.
+ * @return The database properties for the Flink connector.
+ */
+ default Map<String, String> toFlinkDatabaseProperties(Map<String, String>
gravitinoProperties) {
+ return gravitinoProperties;
+ }
+
+ /**
+ * Converts properties from Gravitino table properties to Flink connector
table properties.
+ *
+ * @param flinkCatalogProperties The flinkCatalogProperties are either the
converted properties
+ * obtained through the toFlinkCatalogProperties method in
GravitinoCatalogStore, or the
+ * options passed when writing a CREATE CATALOG statement in Flink SQL.
+ * @param gravitinoTableProperties The table properties provided by
Gravitino.
+ * @param tablePath The tablePath provides the database and table for some
catalogs, such as the
+ * {@link
org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalog}.
+ * @return The table properties for the Flink connector.
+ */
+ default Map<String, String> toFlinkTableProperties(
+ Map<String, String> flinkCatalogProperties,
+ Map<String, String> gravitinoTableProperties,
+ ObjectPath tablePath) {
+ return gravitinoTableProperties;
+ }
+
+ /**
+ * Converts properties from Flink connector table properties to Gravitino
table properties.
+ *
+ * @param flinkProperties The table properties provided by Flink.
+ * @return The table properties for the Gravitino.
+ */
+ default Map<String, String> toGravitinoTableProperties(Map<String, String>
flinkProperties) {
+ return flinkProperties;
+ }
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 9e71321167..d5eeaef726 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -72,7 +72,7 @@ import
org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.TableUtils;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
@@ -89,7 +89,7 @@ import org.apache.gravitino.rel.indexes.Indexes;
* org.apache.flink.table.catalog.Catalog} interface.
*/
public abstract class BaseCatalog extends AbstractCatalog {
- private final PropertiesConverter propertiesConverter;
+ private final SchemaAndTablePropertiesConverter
schemaAndTablePropertiesConverter;
private final PartitionConverter partitionConverter;
private final Map<String, String> catalogOptions;
@@ -97,10 +97,10 @@ public abstract class BaseCatalog extends AbstractCatalog {
String catalogName,
Map<String, String> catalogOptions,
String defaultDatabase,
- PropertiesConverter propertiesConverter,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, defaultDatabase);
- this.propertiesConverter = propertiesConverter;
+ this.schemaAndTablePropertiesConverter = schemaAndTablePropertiesConverter;
this.partitionConverter = partitionConverter;
this.catalogOptions = catalogOptions;
}
@@ -128,7 +128,7 @@ public abstract class BaseCatalog extends AbstractCatalog {
try {
Schema schema = catalog().asSchemas().loadSchema(databaseName);
Map<String, String> properties =
- propertiesConverter.toFlinkDatabaseProperties(schema.properties());
+
schemaAndTablePropertiesConverter.toFlinkDatabaseProperties(schema.properties());
return new CatalogDatabaseImpl(properties, schema.comment());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), databaseName);
@@ -146,7 +146,8 @@ public abstract class BaseCatalog extends AbstractCatalog {
throws DatabaseAlreadyExistException, CatalogException {
try {
Map<String, String> properties =
-
propertiesConverter.toGravitinoSchemaProperties(catalogDatabase.getProperties());
+ schemaAndTablePropertiesConverter.toGravitinoSchemaProperties(
+ catalogDatabase.getProperties());
catalog().asSchemas().createSchema(databaseName,
catalogDatabase.getComment(), properties);
} catch (SchemaAlreadyExistsException e) {
if (!ignoreIfExists) {
@@ -289,7 +290,7 @@ public abstract class BaseCatalog extends AbstractCatalog {
.toArray(Column[]::new);
String comment = table.getComment();
Map<String, String> properties =
- propertiesConverter.toGravitinoTableProperties(table.getOptions());
+
schemaAndTablePropertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable)
table).getPartitionKeys());
@@ -567,7 +568,8 @@ public abstract class BaseCatalog extends AbstractCatalog {
Optional<List<String>> flinkPrimaryKey = getFlinkPrimaryKey(table);
flinkPrimaryKey.ifPresent(builder::primaryKey);
Map<String, String> flinkTableProperties =
- propertiesConverter.toFlinkTableProperties(catalogOptions,
table.properties(), tablePath);
+ schemaAndTablePropertiesConverter.toFlinkTableProperties(
+ catalogOptions, table.properties(), tablePath);
List<String> partitionKeys =
partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys,
flinkTableProperties);
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
index 5086b53257..e898e6b56e 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
@@ -21,8 +21,8 @@ package org.apache.gravitino.flink.connector.catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
public interface BaseCatalogFactory extends CatalogFactory {
@@ -41,11 +41,11 @@ public interface BaseCatalogFactory extends CatalogFactory {
Catalog.Type gravitinoCatalogType();
/**
- * Define properties converter {@link PropertiesConverter}.
+ * Define catalog properties converter {@link CatalogPropertiesConverter}.
*
- * @return The requested property converter.
+ * @return The requested catalog properties converter.
*/
- PropertiesConverter propertiesConverter();
+ CatalogPropertiesConverter catalogPropertiesConverter();
/**
* Define partition converter.
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index fdba9936e9..07411b10d4 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -41,11 +41,16 @@ public class GravitinoHiveCatalog extends BaseCatalog {
String catalogName,
String defaultDatabase,
Map<String, String> catalogOptions,
- PropertiesConverter propertiesConverter,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
@Nullable HiveConf hiveConf,
@Nullable String hiveVersion) {
- super(catalogName, catalogOptions, defaultDatabase, propertiesConverter,
partitionConverter);
+ super(
+ catalogName,
+ catalogOptions,
+ defaultDatabase,
+ schemaAndTablePropertiesConverter,
+ partitionConverter);
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf,
hiveVersion);
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
index e63b7afc00..6744a89f2d 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
@@ -29,9 +29,10 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
import org.apache.gravitino.flink.connector.utils.PropertyUtils;
@@ -60,11 +61,13 @@ public class GravitinoHiveCatalogFactory implements
BaseCatalogFactory {
HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
// Put the hadoop properties managed by Gravitino into the hiveConf
PropertyUtils.getHadoopAndHiveProperties(context.getOptions()).forEach(hiveConf::set);
+ SchemaAndTablePropertiesConverter tablePropertiesConverter =
+ new HiveSchemaAndTablePropertiesConverter(hiveConf);
return new GravitinoHiveCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
context.getOptions(),
- propertiesConverter(),
+ tablePropertiesConverter,
partitionConverter(),
hiveConf,
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
@@ -108,14 +111,9 @@ public class GravitinoHiveCatalogFactory implements
BaseCatalogFactory {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}
- /**
- * Define properties converter {@link PropertiesConverter}.
- *
- * @return The requested property converter.
- */
@Override
- public PropertiesConverter propertiesConverter() {
- return HivePropertiesConverter.INSTANCE;
+ public CatalogPropertiesConverter catalogPropertiesConverter() {
+ return HiveCatalogPropertiesConverter.INSTANCE;
}
/**
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveCatalogPropertiesConverter.java
similarity index 58%
rename from
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
rename to
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveCatalogPropertiesConverter.java
index e4a7420a63..6abedf94cc 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveCatalogPropertiesConverter.java
@@ -21,17 +21,15 @@ package org.apache.gravitino.flink.connector.hive;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
-public class HivePropertiesConverter implements PropertiesConverter {
+public class HiveCatalogPropertiesConverter implements
CatalogPropertiesConverter {
- private HivePropertiesConverter() {}
+ public static final HiveCatalogPropertiesConverter INSTANCE =
+ new HiveCatalogPropertiesConverter();
- public static final HivePropertiesConverter INSTANCE = new
HivePropertiesConverter();
private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname,
HiveConstants.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
@@ -47,29 +45,6 @@ public class HivePropertiesConverter implements
PropertiesConverter {
return GRAVITINO_CONFIG_TO_HIVE.get(configKey);
}
- @Override
- public Map<String, String> toFlinkTableProperties(
- Map<String, String> flinkCatalogProperties,
- Map<String, String> gravitinoTableProperties,
- ObjectPath tablePath) {
- Map<String, String> properties =
- gravitinoTableProperties.entrySet().stream()
- .collect(
- Collectors.toMap(
- entry -> {
- String key = entry.getKey();
- if
(key.startsWith(HiveConstants.SERDE_PARAMETER_PREFIX)) {
- return
key.substring(HiveConstants.SERDE_PARAMETER_PREFIX.length());
- } else {
- return key;
- }
- },
- Map.Entry::getValue,
- (existingValue, newValue) -> newValue));
- properties.put("connector", "hive");
- return properties;
- }
-
@Override
public String getFlinkCatalogType() {
return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveSchemaAndTablePropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveSchemaAndTablePropertiesConverter.java
new file mode 100644
index 0000000000..ca82a24622
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HiveSchemaAndTablePropertiesConverter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.hive;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.util.Constants;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+
+public class HiveSchemaAndTablePropertiesConverter implements
SchemaAndTablePropertiesConverter {
+
+ private static final StorageFormatFactory STORAGE_FORMAT_FACTORY = new
StorageFormatFactory();
+ private final HiveConf hiveConf;
+
+ HiveSchemaAndTablePropertiesConverter(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ @Override
+ public Map<String, String> toFlinkTableProperties(
+ Map<String, String> flinkCatalogProperties,
+ Map<String, String> gravitinoTableProperties,
+ ObjectPath tablePath) {
+ Map<String, String> properties =
+ gravitinoTableProperties.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> {
+ String key = entry.getKey();
+ if
(key.startsWith(HiveConstants.SERDE_PARAMETER_PREFIX)) {
+ return
key.substring(HiveConstants.SERDE_PARAMETER_PREFIX.length());
+ } else {
+ return key;
+ }
+ },
+ Map.Entry::getValue,
+ (existingValue, newValue) -> newValue));
+ properties.put("connector", "hive");
+ return properties;
+ }
+
+ @Override
+ public Map<String, String> toGravitinoTableProperties(Map<String, String>
flinkProperties) {
+ Map<String, String> properties = new HashMap<>(flinkProperties);
+ String specifiedSerdeLib =
properties.remove(Constants.SERDE_LIB_CLASS_NAME);
+ String specifiedStorageFormat =
properties.remove(Constants.STORED_AS_FILE_FORMAT);
+ String specifiedInputFormat =
properties.remove(Constants.STORED_AS_INPUT_FORMAT);
+ String specifiedOutputFormat =
properties.remove(Constants.STORED_AS_OUTPUT_FORMAT);
+
+ Map<String, String> serdeParameters = new HashMap<>();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(Constants.SERDE_INFO_PROP_PREFIX)) {
+ String parameterKey =
key.substring(Constants.SERDE_INFO_PROP_PREFIX.length());
+ serdeParameters.put(HiveConstants.SERDE_PARAMETER_PREFIX +
parameterKey, entry.getValue());
+ }
+ }
+ properties.keySet().removeIf(k ->
k.startsWith(Constants.SERDE_INFO_PROP_PREFIX));
+ properties.putAll(serdeParameters);
+
+ validateStorageFormat(specifiedStorageFormat);
+ InputOutputFormat storageFormatIo =
+ resolveInputOutputFormat(
+ specifiedStorageFormat,
+ specifiedInputFormat,
+ specifiedOutputFormat,
+ properties,
+ hiveConf);
+ if (storageFormatIo.inputFormat != null) {
+ properties.put(HiveConstants.INPUT_FORMAT, storageFormatIo.inputFormat);
+ }
+ if (storageFormatIo.outputFormat != null) {
+ properties.put(HiveConstants.OUTPUT_FORMAT,
storageFormatIo.outputFormat);
+ }
+
+ String serdeToUse = resolveSerdeLib(specifiedStorageFormat,
specifiedSerdeLib, hiveConf);
+ if (serdeToUse != null) {
+ properties.put(HiveConstants.SERDE_LIB, serdeToUse);
+ }
+
+ String formatRaw = resolveStorageFormat(specifiedStorageFormat, hiveConf);
+ if (formatRaw != null) {
+ properties.put(HiveConstants.FORMAT, formatRaw);
+ }
+ return properties;
+ }
+
+ private static String resolveStorageFormat(String storedAsFileFormat,
HiveConf hiveConf) {
+ if (storedAsFileFormat != null) {
+ return storedAsFileFormat;
+ }
+ return getDefaultStorageFormat(hiveConf);
+ }
+
+ private static String getDefaultStorageFormat(HiveConf hiveConf) {
+ return hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT);
+ }
+
+ // 1. use the serde lib in the stored-as format
+ // 2. use the serde lib specified in the properties
+ // 3. use the serde lib from default file format
+ // 4. use the default serde in hive conf
+ // please refer to org.apache.flink.table.catalog.hive.util.HiveTableUtils
for more details
+ private static String resolveSerdeLib(
+ String specifiedStorageFormat, @Nullable String specifiedSerde, HiveConf
hiveConf) {
+ String formatSerde = getSerdeForFormat(specifiedStorageFormat, hiveConf);
+ if (formatSerde != null) {
+ return formatSerde;
+ }
+
+ if (specifiedSerde != null) {
+ return specifiedSerde;
+ }
+
+ if (specifiedStorageFormat == null) {
+ formatSerde = getSerdeForFormat(getDefaultStorageFormat(hiveConf),
hiveConf);
+ if (formatSerde != null) {
+ return formatSerde;
+ }
+ }
+
+ return hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE);
+ }
+
+ private static String getSerdeForFormat(String format, HiveConf hiveConf) {
+ if (format == null) {
+ return null;
+ }
+ StorageFormatDescriptor descriptor = STORAGE_FORMAT_FACTORY.get(format);
+ if (descriptor == null) {
+ return null;
+ }
+ String serdeLib = descriptor.getSerde();
+ if (serdeLib == null && descriptor instanceof
RCFileStorageFormatDescriptor) {
+ serdeLib = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE);
+ }
+ return serdeLib;
+ }
+
+ private static void validateStorageFormat(String format) {
+ if (format == null) {
+ return;
+ }
+ StorageFormatDescriptor descriptor = STORAGE_FORMAT_FACTORY.get(format);
+ Preconditions.checkArgument(descriptor != null, "Unknown storage format
%s", format);
+ }
+
+ private static InputOutputFormat resolveInputOutputFormat(
+ String specifiedStorageFormat,
+ @Nullable String specifiedInputFormat,
+ @Nullable String specifiedOutputFormat,
+ Map<String, String> properties,
+ HiveConf hiveConf) {
+ // 1. use input/output from storage format (STORED AS)
+ // 2. use input/output from table properties
+ // 3. use input/output from default file format
+ if (specifiedStorageFormat != null) {
+ StorageFormatDescriptor descriptor =
STORAGE_FORMAT_FACTORY.get(specifiedStorageFormat);
+ return new InputOutputFormat(descriptor.getInputFormat(),
descriptor.getOutputFormat());
+ }
+
+ String inputFormat = specifiedInputFormat;
+ String outputFormat = specifiedOutputFormat;
+ if (inputFormat == null) {
+ inputFormat = properties.get(HiveConstants.INPUT_FORMAT);
+ }
+ if (outputFormat == null) {
+ outputFormat = properties.get(HiveConstants.OUTPUT_FORMAT);
+ }
+
+ if (inputFormat == null || outputFormat == null) {
+ String defaultFormat = getDefaultStorageFormat(hiveConf);
+ StorageFormatDescriptor descriptor =
STORAGE_FORMAT_FACTORY.get(defaultFormat);
+ if (descriptor != null) {
+ if (inputFormat == null) {
+ inputFormat = descriptor.getInputFormat();
+ }
+ if (outputFormat == null) {
+ outputFormat = descriptor.getOutputFormat();
+ }
+ }
+ }
+ return new InputOutputFormat(inputFormat, outputFormat);
+ }
+
+ private static class InputOutputFormat {
+ private final String inputFormat;
+ private final String outputFormat;
+
+ private InputOutputFormat(String inputFormat, String outputFormat) {
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ }
+ }
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
index 03566ff4b0..0d4839707e 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
@@ -23,7 +23,7 @@ import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.FlinkCatalogFactory;
@@ -36,14 +36,14 @@ public class GravitinoIcebergCatalog extends BaseCatalog {
protected GravitinoIcebergCatalog(
String catalogName,
String defaultDatabase,
- PropertiesConverter propertiesConverter,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
Map<String, String> flinkCatalogProperties) {
super(
catalogName,
flinkCatalogProperties,
defaultDatabase,
- propertiesConverter,
+ schemaAndTablePropertiesConverter,
partitionConverter);
FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
this.icebergCatalog =
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
index d9029f7a93..7fcfe35327 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
@@ -23,9 +23,10 @@ import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
@@ -38,7 +39,7 @@ public class GravitinoIcebergCatalogFactory implements
BaseCatalogFactory {
return new GravitinoIcebergCatalog(
context.getName(),
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
- propertiesConverter(),
+ schemaAndTablePropertiesConverter(),
partitionConverter(),
context.getOptions());
}
@@ -79,13 +80,12 @@ public class GravitinoIcebergCatalogFactory implements
BaseCatalogFactory {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}
- /**
- * Define properties converter.
- *
- * @return The properties converter instance for Iceberg catalog.
- */
@Override
- public PropertiesConverter propertiesConverter() {
+ public CatalogPropertiesConverter catalogPropertiesConverter() {
+ return IcebergPropertiesConverter.INSTANCE;
+ }
+
+ public SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter()
{
return IcebergPropertiesConverter.INSTANCE;
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
index 1d80e27ea5..8bc75b3b2d 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
@@ -24,9 +24,11 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
-public class IcebergPropertiesConverter implements PropertiesConverter {
+public class IcebergPropertiesConverter
+ implements CatalogPropertiesConverter, SchemaAndTablePropertiesConverter {
public static IcebergPropertiesConverter INSTANCE = new
IcebergPropertiesConverter();
private IcebergPropertiesConverter() {}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
index 53487ffe52..9a75ba2c8b 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
/**
@@ -41,13 +41,13 @@ public class GravitinoJdbcCatalog extends BaseCatalog {
protected GravitinoJdbcCatalog(
CatalogFactory.Context context,
String defaultDatabase,
- PropertiesConverter propertiesConverter,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter) {
super(
context.getName(),
context.getOptions(),
defaultDatabase,
- propertiesConverter,
+ schemaAndTablePropertiesConverter,
partitionConverter);
JdbcCatalogFactory jdbcCatalogFactory = new JdbcCatalogFactory();
this.jdbcCatalog = (JdbcCatalog) jdbcCatalogFactory.createCatalog(context);
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
index b8090ac7b7..0b2a489d44 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.UnsupportPartitionConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
@@ -36,6 +37,8 @@ import
org.apache.gravitino.flink.connector.utils.FactoryUtils;
*/
public abstract class GravitinoJdbcCatalogFactory implements
BaseCatalogFactory {
+ protected abstract SchemaAndTablePropertiesConverter
schemaAndTablePropertiesConverter();
+
@Override
public org.apache.flink.table.catalog.Catalog createCatalog(Context context)
{
// FlinkJdbcCatalog does not support 'driver' as an option, but Gravitino
JdbcCatalog requires
@@ -49,7 +52,7 @@ public abstract class GravitinoJdbcCatalogFactory implements
BaseCatalogFactory
defaultDatabase != null,
GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should
not be null.");
return new GravitinoJdbcCatalog(
- context, defaultDatabase, propertiesConverter(), partitionConverter());
+ context, defaultDatabase, schemaAndTablePropertiesConverter(),
partitionConverter());
}
@Override
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java
index 99215f68bc..da98a804c7 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java
@@ -26,16 +26,18 @@ import java.util.regex.Pattern;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
-public abstract class JdbcPropertiesConverter implements PropertiesConverter {
+public abstract class JdbcPropertiesConverter
+ implements CatalogPropertiesConverter, SchemaAndTablePropertiesConverter {
private static final Pattern jdbcUrlPattern =
Pattern.compile("(jdbc:\\w+://[^:/]+(?::\\d+)?)");
@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration
flinkConf) {
Map<String, String> gravitinoCatalogProperties =
- PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf);
+
CatalogPropertiesConverter.super.toGravitinoCatalogProperties(flinkConf);
if
(!gravitinoCatalogProperties.containsKey(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER))
{
gravitinoCatalogProperties.put(
JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, defaultDriverName());
@@ -46,7 +48,7 @@ public abstract class JdbcPropertiesConverter implements
PropertiesConverter {
@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
Map<String, String> flinkCatalogProperties =
-
PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties);
+
CatalogPropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties);
String gravitinoJdbcUrl =
gravitinoProperties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_URL);
Preconditions.checkArgument(
gravitinoJdbcUrl != null,
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java
index 55b04c6fbd..c6b5c03718 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java
@@ -19,7 +19,8 @@
package org.apache.gravitino.flink.connector.jdbc.mysql;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactory;
import
org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions;
@@ -31,7 +32,11 @@ public class GravitinoMysqlJdbcCatalogFactory extends
GravitinoJdbcCatalogFactor
}
@Override
- public PropertiesConverter propertiesConverter() {
+ public CatalogPropertiesConverter catalogPropertiesConverter() {
+ return MysqlPropertiesConverter.INSTANCE;
+ }
+
+ public SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter()
{
return MysqlPropertiesConverter.INSTANCE;
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java
index 25f9bca86f..95e5be2e4a 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java
@@ -19,7 +19,8 @@
package org.apache.gravitino.flink.connector.jdbc.postgresql;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactory;
import
org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions;
@@ -31,7 +32,11 @@ public class GravitinoPostgresJdbcCatalogFactory extends
GravitinoJdbcCatalogFac
}
@Override
- public PropertiesConverter propertiesConverter() {
+ public CatalogPropertiesConverter catalogPropertiesConverter() {
+ return PostgresqlPropertiesConverter.INSTANCE;
+ }
+
+ public SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter()
{
return PostgresqlPropertiesConverter.INSTANCE;
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index cc5d8039a2..fa4fd86322 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkTableFactory;
@@ -44,13 +44,13 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
protected GravitinoPaimonCatalog(
CatalogFactory.Context context,
String defaultDatabase,
- PropertiesConverter propertiesConverter,
+ SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter) {
super(
context.getName(),
context.getOptions(),
defaultDatabase,
- propertiesConverter,
+ schemaAndTablePropertiesConverter,
partitionConverter);
FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
index 8732ade23e..0d7645c6df 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
@@ -24,9 +24,10 @@ import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
@@ -43,7 +44,7 @@ public class GravitinoPaimonCatalogFactory implements
BaseCatalogFactory {
String defaultDatabase =
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
return new GravitinoPaimonCatalog(
- context, defaultDatabase, propertiesConverter(), partitionConverter());
+ context, defaultDatabase, schemaAndTablePropertiesConverter(),
partitionConverter());
}
@Override
@@ -72,7 +73,11 @@ public class GravitinoPaimonCatalogFactory implements
BaseCatalogFactory {
}
@Override
- public PropertiesConverter propertiesConverter() {
+ public CatalogPropertiesConverter catalogPropertiesConverter() {
+ return PaimonPropertiesConverter.INSTANCE;
+ }
+
+ public SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter()
{
return PaimonPropertiesConverter.INSTANCE;
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
index 99e402bcb8..17d7001b6b 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
@@ -21,9 +21,11 @@ package org.apache.gravitino.flink.connector.paimon;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
-public class PaimonPropertiesConverter implements PropertiesConverter {
+public class PaimonPropertiesConverter
+ implements CatalogPropertiesConverter, SchemaAndTablePropertiesConverter {
public static final PaimonPropertiesConverter INSTANCE = new
PaimonPropertiesConverter();
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
index bee063e25a..89d85120e6 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
@@ -36,7 +36,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.slf4j.Logger;
@@ -58,7 +58,7 @@ public class GravitinoCatalogStore extends
AbstractCatalogStore {
Map<String, String> gravitino = configuration.toMap();
BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino);
Map<String, String> gravitinoProperties =
-
catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration);
+
catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
@@ -97,9 +97,10 @@ public class GravitinoCatalogStore extends
AbstractCatalogStore {
try {
Catalog catalog =
gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
BaseCatalogFactory catalogFactory =
getCatalogFactory(catalog.provider());
- PropertiesConverter propertiesConverter =
catalogFactory.propertiesConverter();
+ CatalogPropertiesConverter catalogPropertiesConverter =
+ catalogFactory.catalogPropertiesConverter();
Map<String, String> flinkCatalogProperties =
- propertiesConverter.toFlinkCatalogProperties(catalog.properties());
+
catalogPropertiesConverter.toFlinkCatalogProperties(catalog.properties());
CatalogDescriptor descriptor =
CatalogDescriptor.of(catalogName,
Configuration.fromMap(flinkCatalogProperties));
return Optional.of(descriptor);
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
index c55645b9c6..66f63e60ff 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.TableChange;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.flink.connector.PartitionConverter;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -142,7 +142,7 @@ public class TestBaseCatalog {
"test",
Collections.emptyMap(),
"default",
- Mockito.mock(PropertiesConverter.class),
+ Mockito.mock(SchemaAndTablePropertiesConverter.class),
Mockito.mock(PartitionConverter.class));
this.delegate = delegate;
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
index 7191f0aec7..d6f4b13eb8 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
@@ -28,7 +28,8 @@ import org.junit.jupiter.api.Test;
public class TestHivePropertiesConverter {
- private static final HivePropertiesConverter CONVERTER =
HivePropertiesConverter.INSTANCE;
+ private static final HiveCatalogPropertiesConverter CONVERTER =
+ HiveCatalogPropertiesConverter.INSTANCE;
@Test
public void testToGravitinoCatalogProperties() {
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHiveSchemaAndTablePropertiesConverter.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHiveSchemaAndTablePropertiesConverter.java
new file mode 100644
index 0000000000..4485d01178
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/hive/TestHiveSchemaAndTablePropertiesConverter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.hive;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.flink.table.catalog.hive.util.Constants;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.catalog.hive.HiveStorageConstants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestHiveSchemaAndTablePropertiesConverter {
+
+ @Test
+ public void testDefaultFormatSerdeApplied() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
converter.toGravitinoTableProperties(Collections.emptyMap());
+ Assertions.assertEquals("ORC", properties.get(HiveConstants.FORMAT));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_SERDE_CLASS,
properties.get(HiveConstants.SERDE_LIB));
+ }
+
+ @Test
+ public void testRowFormatOverridesDefaultFormatSerde() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(Constants.SERDE_LIB_CLASS_NAME,
"com.acme.CustomSerde"));
+ Assertions.assertEquals("ORC", properties.get(HiveConstants.FORMAT));
+ Assertions.assertEquals("com.acme.CustomSerde",
properties.get(HiveConstants.SERDE_LIB));
+ }
+
+ @Test
+ public void testStoredAsOverridesRowFormatSerde() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(
+ Constants.SERDE_LIB_CLASS_NAME,
+ "com.acme.CustomSerde",
+ Constants.STORED_AS_FILE_FORMAT,
+ "ORC"));
+ Assertions.assertEquals("ORC", properties.get(HiveConstants.FORMAT));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_SERDE_CLASS,
properties.get(HiveConstants.SERDE_LIB));
+ }
+
+ @Test
+ public void testStoredAsInputOutputCopiedWhenNoFileFormat() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(
+ Constants.STORED_AS_INPUT_FORMAT,
+ "inputFormat",
+ Constants.STORED_AS_OUTPUT_FORMAT,
+ "outputFormat"));
+ Assertions.assertEquals("inputFormat",
properties.get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals("outputFormat",
properties.get(HiveConstants.OUTPUT_FORMAT));
+ }
+
+ @Test
+ public void testInputOutputFromStoredAsFormat() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(Constants.STORED_AS_FILE_FORMAT, "ORC"));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_INPUT_FORMAT_CLASS,
properties.get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_OUTPUT_FORMAT_CLASS,
properties.get(HiveConstants.OUTPUT_FORMAT));
+ }
+
+ @Test
+ public void testInputOutputFromTablePropertiesBeforeDefault() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Map<String, String> properties =
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(
+ HiveConstants.INPUT_FORMAT,
+ "customInput",
+ HiveConstants.OUTPUT_FORMAT,
+ "customOutput"));
+ Assertions.assertEquals("customInput",
properties.get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals("customOutput",
properties.get(HiveConstants.OUTPUT_FORMAT));
+ }
+
+ @Test
+ public void testValidateStorageFormat() {
+ HiveSchemaAndTablePropertiesConverter converter =
+ new HiveSchemaAndTablePropertiesConverter(defaultHiveConf("ORC"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ converter.toGravitinoTableProperties(
+ ImmutableMap.of(Constants.STORED_AS_FILE_FORMAT,
"NotAFormat")));
+ }
+
+ private static HiveConf defaultHiveConf(String defaultFormat) {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT, defaultFormat);
+ hiveConf.setVar(
+ HiveConf.ConfVars.HIVEDEFAULTSERDE,
HiveStorageConstants.LAZY_SIMPLE_SERDE_CLASS);
+ return hiveConf;
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index b275d81a78..a0b2c2a10e 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -37,7 +37,7 @@ import
org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
@@ -111,7 +111,7 @@ public abstract class FlinkEnvIT extends BaseIT {
protected void stopCatalogEnv() throws Exception {}
protected String flinkByPass(String key) {
- return PropertiesConverter.FLINK_PROPERTY_PREFIX + key;
+ return CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX + key;
}
protected abstract String getProvider();
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index f64170f811..17d8695cb7 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -25,16 +25,21 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.security.PrivilegedAction;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDescriptor;
@@ -44,10 +49,12 @@ import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.flink.table.catalog.hive.util.Constants;
import org.apache.flink.types.Row;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.catalog.hive.HiveStorageConstants;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
@@ -59,6 +66,9 @@ import
org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.types.Types;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -72,6 +82,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
private static final String FLINK_USER_NAME = "gravitino";
private static org.apache.gravitino.Catalog hiveCatalog;
+ private static String hiveConfDir;
@Override
protected boolean supportsPrimaryKey() {
@@ -80,6 +91,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
@BeforeAll
void hiveStartUp() {
+ initHiveConfDir();
initDefaultHiveCatalog();
}
@@ -100,6 +112,45 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
ImmutableMap.of("metastore.uris", hiveMetastoreUri));
}
+ private void initHiveConfDir() {
+ if (hiveConfDir != null) {
+ return;
+ }
+ try {
+ java.nio.file.Path dir =
java.nio.file.Files.createTempDirectory("flink-hive-conf");
+ java.nio.file.Path hiveSite = dir.resolve("hive-site.xml");
+ String hiveSiteXml =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ + "<?xml-stylesheet type=\"text/xsl\"
href=\"configuration.xsl\"?>\n"
+ + "<configuration>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.sasl.enabled</name>\n"
+ + " <value>false</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.uris</name>\n"
+ + " <value>"
+ + hiveMetastoreUri
+ + "</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hadoop.security.authentication</name>\n"
+ + " <value>simple</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.warehouse.dir</name>\n"
+ + " <value>"
+ + warehouse
+ + "</value>\n"
+ + " </property>\n"
+ + "</configuration>\n";
+ java.nio.file.Files.write(hiveSite,
hiveSiteXml.getBytes(StandardCharsets.UTF_8));
+ hiveConfDir = dir.toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to prepare hive conf dir for ITs", e);
+ }
+ }
+
@Test
public void testCreateGravitinoHiveCatalog() {
tableEnv.useCatalog(DEFAULT_CATALOG);
@@ -126,7 +177,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
Map<String, String> flinkProperties =
gravitinoCatalog.properties().entrySet().stream()
- .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
+ .filter(e ->
e.getKey().startsWith(CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Assertions.assertEquals(2, flinkProperties.size());
Assertions.assertEquals(
@@ -192,7 +243,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
Map<String, String> flinkProperties =
properties.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
+ .filter(e ->
e.getKey().startsWith(CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Assertions.assertEquals(3, flinkProperties.size());
Assertions.assertEquals(
@@ -473,6 +524,144 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
true);
}
+ @Test
+ public void testRowFormatSerdeOverridesDefaultFormat() {
+ String databaseName = "test_hive_row_format_precedence_db";
+ String tableName = "test_row_format_overrides_default";
+ String customSerde = HiveStorageConstants.OPENCSV_SERDE_CLASS;
+
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TestUtils.assertTableResult(
+ sql(
+ "CREATE TABLE %s (id STRING) WITH ('%s'='%s')",
+ tableName, Constants.SERDE_LIB_CLASS_NAME, customSerde),
+ ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("INSERT INTO %s VALUES ('1')", tableName),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of(-1L));
+ Table tableWithRowFormat =
+
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+ Assertions.assertEquals(
+ HiveStorageConstants.TEXT_INPUT_FORMAT_CLASS,
+ tableWithRowFormat.properties().get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals(
+ "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat",
+
tableWithRowFormat.properties().get(HiveConstants.OUTPUT_FORMAT));
+ Assertions.assertEquals(
+ customSerde,
tableWithRowFormat.properties().get(HiveConstants.SERDE_LIB));
+ assertHiveCatalogRead(databaseName, tableName, Row.of("1"));
+ },
+ true);
+ }
+
+ @Test
+ public void testStoredAsOverridesRowFormatSerde() {
+ String databaseName = "test_hive_stored_as_precedence_db";
+ String tableName = "test_stored_as_overrides_row_format";
+ String customSerde = HiveStorageConstants.OPENCSV_SERDE_CLASS;
+
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TestUtils.assertTableResult(
+ sql(
+ "CREATE TABLE %s (id INT) WITH ('%s'='%s', '%s'='%s')",
+ tableName,
+ Constants.SERDE_LIB_CLASS_NAME,
+ customSerde,
+ Constants.STORED_AS_FILE_FORMAT,
+ "ORC"),
+ ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("INSERT INTO %s VALUES (1)", tableName),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of(-1L));
+ Table tableWithStoredAs =
+
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_SERDE_CLASS,
+ tableWithStoredAs.properties().get(HiveConstants.SERDE_LIB));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_INPUT_FORMAT_CLASS,
+ tableWithStoredAs.properties().get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals(
+ HiveStorageConstants.ORC_OUTPUT_FORMAT_CLASS,
+ tableWithStoredAs.properties().get(HiveConstants.OUTPUT_FORMAT));
+ assertHiveCatalogRead(databaseName, tableName, Row.of(1));
+ },
+ true);
+ }
+
+ @Test
+ public void testDefaultFormatAndSerdeApplied() {
+ String databaseName = "test_hive_default_format_db";
+ String tableName = "test_default_format_and_serde";
+
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ Optional<Catalog> flinkCatalog = tableEnv.getCatalog(catalog.name());
+ Assertions.assertTrue(flinkCatalog.isPresent());
+ HiveConf hiveConf = ((GravitinoHiveCatalog)
flinkCatalog.get()).getHiveConf();
+ TestUtils.assertTableResult(
+ sql("CREATE TABLE %s (id INT)", tableName), ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("INSERT INTO %s VALUES (1)", tableName),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of(-1L));
+ Table tableWithDefaults =
+
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+ StorageFormatFactory storageFormatFactory = new
StorageFormatFactory();
+ String defaultFormat =
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT);
+ StorageFormatDescriptor descriptor =
storageFormatFactory.get(defaultFormat);
+ Assertions.assertNotNull(descriptor);
+ Assertions.assertEquals(
+ descriptor.getInputFormat(),
+ tableWithDefaults.properties().get(HiveConstants.INPUT_FORMAT));
+ Assertions.assertEquals(
+ descriptor.getOutputFormat(),
+ tableWithDefaults.properties().get(HiveConstants.OUTPUT_FORMAT));
+ String expectedSerde = descriptor.getSerde();
+ if (expectedSerde == null && descriptor instanceof
RCFileStorageFormatDescriptor) {
+ expectedSerde =
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE);
+ }
+ if (expectedSerde == null) {
+ expectedSerde =
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE);
+ }
+ Assertions.assertEquals(
+ expectedSerde,
tableWithDefaults.properties().get(HiveConstants.SERDE_LIB));
+ assertHiveCatalogRead(databaseName, tableName, Row.of(1));
+ },
+ true);
+ }
+
+ private void assertHiveCatalogRead(String databaseName, String tableName,
Row expectedRow) {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment hiveEnv = TableEnvironment.create(settings);
+ try {
+ TestUtils.assertTableResult(
+ hiveEnv.executeSql(
+ String.format(
+ "CREATE CATALOG hive_it WITH ('type'='hive',
'hive-conf-dir'='%s')",
+ hiveConfDir)),
+ ResultKind.SUCCESS);
+ TestUtils.assertTableResult(hiveEnv.executeSql("USE CATALOG hive_it"),
ResultKind.SUCCESS);
+ TestUtils.assertTableResult(hiveEnv.executeSql("USE " + databaseName),
ResultKind.SUCCESS);
+ TableResult result = hiveEnv.executeSql("SELECT * FROM " + tableName);
+ List<Row> rows = Lists.newArrayList(result.collect());
+ Assertions.assertEquals(1, rows.size());
+ Assertions.assertEquals(expectedRow, rows.get(0));
+ } finally {
+ ((TableEnvironmentImpl) hiveEnv).getCatalogManager().close();
+ }
+ }
+
@Test
public void testGetHiveTable() {
Column[] columns =
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
index e87f60af1b..88235e208a 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
@@ -39,7 +39,7 @@ import
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.gravitino.Configs;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
@@ -190,7 +190,7 @@ public class FlinkHiveKerberosClientIT extends FlinkEnvIT {
// Verify Flink-specific properties are stored correctly
Map<String, String> flinkProperties =
properties.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
+ .filter(e ->
e.getKey().startsWith(CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Assertions.assertEquals(2, flinkProperties.size());
Assertions.assertEquals(
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java
index 4496d94c0a..7ead6999a4 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java
@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.CatalogPropertiesConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -58,7 +58,7 @@ public class TestPaimonPropertiesConverter {
testUser,
PaimonConstants.GRAVITINO_JDBC_PASSWORD,
testPassword,
- PropertiesConverter.FLINK_PROPERTY_PREFIX + PaimonConstants.URI,
+ CatalogPropertiesConverter.FLINK_PROPERTY_PREFIX +
PaimonConstants.URI,
testUri);
Map<String, String> flinkCatalogProperties =
CONVERTER.toFlinkCatalogProperties(catalogProperties);