This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 792ded1162 [#6302] refactor:Optimize Flink connector properties
converter (#6303)
792ded1162 is described below
commit 792ded11624e53076b6a6b81a3356a35abbceb8b
Author: Xiaojian Sun <[email protected]>
AuthorDate: Fri Jan 24 08:30:11 2025 +0800
[#6302] refactor:Optimize Flink connector properties converter (#6303)
### What changes were proposed in this pull request?
- Unified params format, add flink.bypass. prefix, especially Iceberg.
- Extract the flink.bypass. logic to the PropertiesConverter, and the
extension logic does not care about this operation
### Why are the changes needed?
Fix: [#(6302)](https://github.com/apache/gravitino/issues/6302)
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
---
.../lakehouse/iceberg/IcebergPropertiesUtils.java | 9 ++
.../flink/connector/PropertiesConverter.java | 83 ++++++++++++++++--
.../connector/hive/HivePropertiesConverter.java | 43 ++--------
.../iceberg/IcebergPropertiesConverter.java | 47 +++-------
.../paimon/PaimonPropertiesConverter.java | 56 +++---------
.../iceberg/TestIcebergPropertiesConverter.java | 99 ++++++++++++++++------
.../test/iceberg/FlinkIcebergCatalogIT.java | 4 -
.../test/paimon/FlinkPaimonCatalogIT.java | 2 +-
8 files changed, 197 insertions(+), 146 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
index df1340c947..92c5d18a12 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
@@ -33,6 +33,7 @@ public class IcebergPropertiesUtils {
// will only need to set the configuration 'catalog-backend' in Gravitino
and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
+ public static final Map<String, String> ICEBERG_CATALOG_CONFIG_TO_GRAVITINO;
static {
Map<String, String> map = new HashMap();
@@ -65,6 +66,14 @@ public class IcebergPropertiesUtils {
AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY);
GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);
+
+ Map<String, String> icebergCatalogConfigToGravitino = new HashMap<>();
+ map.forEach(
+ (key, value) -> {
+ icebergCatalogConfigToGravitino.put(value, key);
+ });
+ ICEBERG_CATALOG_CONFIG_TO_GRAVITINO =
+ Collections.unmodifiableMap(icebergCatalogConfigToGravitino);
}
/**
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
index c9fbb8a491..15d1a12fa3 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
@@ -19,8 +19,10 @@
package org.apache.gravitino.flink.connector;
+import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
/**
* PropertiesConverter is used to convert properties between Flink properties
and Apache Gravitino
@@ -32,25 +34,82 @@ public interface PropertiesConverter {
/**
* Converts properties from application provided properties and Flink
connector properties to
- * Gravitino properties.
+ * Gravitino properties.This method processes the Flink configuration and
transforms it into a
+ * format suitable for the Gravitino catalog.
*
- * @param flinkConf The configuration provided by Flink.
- * @return properties for the Gravitino catalog.
+ * @param flinkConf The Flink configuration containing connector properties.
This includes both
+ * Flink-specific properties and any user-provided properties.
+ * @return A map of properties converted for use in the Gravitino catalog.
The returned map
+ * includes both directly transformed properties and bypass properties
prefixed with {@link
+ * #FLINK_PROPERTY_PREFIX}.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration
flinkConf) {
- return flinkConf.toMap();
+ Map<String, String> gravitinoProperties = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
+ String gravitinoKey =
transformPropertyToGravitinoCatalog(entry.getKey());
+ if (gravitinoKey != null) {
+ gravitinoProperties.put(gravitinoKey, entry.getValue());
+ } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
+ gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(),
entry.getValue());
+ } else {
+ gravitinoProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return gravitinoProperties;
}
/**
- * Converts properties from Gravitino properties to Flink connector
properties.
+ * Converts properties from Gravitino catalog properties to Flink connector
properties. This
+ * method processes the Gravitino properties and transforms them into a
format suitable for the
+ * Flink connector.
*
- * @param gravitinoProperties The properties provided by Gravitino.
- * @return properties for the Flink connector.
+ * @param gravitinoProperties The properties provided by the Gravitino
catalog. This includes both
+ * Gravitino-specific properties and any bypass properties prefixed with
{@link
+ * #FLINK_PROPERTY_PREFIX}.
+ * @return A map of properties converted for use in the Flink connector. The
returned map includes
+ * both transformed properties and the Flink catalog type.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
- return gravitinoProperties;
+ Map<String, String> allProperties = Maps.newHashMap();
+ gravitinoProperties.forEach(
+ (key, value) -> {
+ String flinkConfigKey = key;
+ if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
+ flinkConfigKey =
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
+ allProperties.put(flinkConfigKey, value);
+ } else {
+ String convertedKey =
transformPropertyToFlinkCatalog(flinkConfigKey);
+ if (convertedKey != null) {
+ allProperties.put(convertedKey, value);
+ }
+ }
+ });
+ allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(),
getFlinkCatalogType());
+ return allProperties;
}
+ /**
+ * Transforms a Flink configuration key to a corresponding Gravitino catalog
property key. This
+ * method is used to map Flink-specific configuration keys to Gravitino
catalog properties.
+ *
+ * @param configKey The Flink configuration key to be transformed.
+ * @return The corresponding Gravitino catalog property key, or {@code null}
if no transformation
+ * is needed.
+ */
+ String transformPropertyToGravitinoCatalog(String configKey);
+
+ /**
+ * Transforms a specific configuration key from Gravitino catalog properties
to Flink connector
+ * properties. This method is used to convert a property key that is
specific to Gravitino into a
+ * format that can be understood by the Flink connector.
+ *
+ * @param configKey The configuration key from Gravitino catalog properties
to be transformed.
+ * @return The transformed configuration key that is compatible with the
Flink connector.
+ * @throws IllegalArgumentException If the provided configuration key cannot
be transformed or is
+ * invalid.
+ */
+ String transformPropertyToFlinkCatalog(String configKey);
+
/**
* Converts properties from Flink connector schema properties to Gravitino
schema properties.
*
@@ -90,4 +149,12 @@ public interface PropertiesConverter {
default Map<String, String> toGravitinoTableProperties(Map<String, String>
flinkProperties) {
return flinkProperties;
}
+
+ /**
+ * Retrieves the Flink catalog type associated with this converter. This
method is used to
+ * determine the type of Flink catalog that this converter is designed for.
+ *
+ * @return The Flink catalog type.
+ */
+ String getFlinkCatalogType();
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
index 1809586749..20a3e8cf62 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
@@ -20,11 +20,8 @@
package org.apache.gravitino.flink.connector.hive;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,46 +31,19 @@ public class HivePropertiesConverter implements
PropertiesConverter {
private HivePropertiesConverter() {}
public static final HivePropertiesConverter INSTANCE = new
HivePropertiesConverter();
-
private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname,
HiveConstants.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
ImmutableMap.of(HiveConstants.METASTORE_URIS,
HiveConf.ConfVars.METASTOREURIS.varname);
@Override
- public Map<String, String> toGravitinoCatalogProperties(Configuration
flinkConf) {
- Map<String, String> gravitinoProperties = Maps.newHashMap();
-
- for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
- String gravitinoKey =
HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
- if (gravitinoKey != null) {
- gravitinoProperties.put(gravitinoKey, entry.getValue());
- } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
- gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(),
entry.getValue());
- } else {
- gravitinoProperties.put(entry.getKey(), entry.getValue());
- }
- }
-
- return gravitinoProperties;
+ public String transformPropertyToGravitinoCatalog(String configKey) {
+ return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}
@Override
- public Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
- Map<String, String> flinkCatalogProperties = Maps.newHashMap();
- flinkCatalogProperties.put(
- CommonCatalogOptions.CATALOG_TYPE.key(),
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
-
- gravitinoProperties.forEach(
- (key, value) -> {
- String flinkConfigKey = key;
- if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
- flinkConfigKey =
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
- }
- flinkCatalogProperties.put(
- GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey,
flinkConfigKey), value);
- });
- return flinkCatalogProperties;
+ public String transformPropertyToFlinkCatalog(String configKey) {
+ return GRAVITINO_CONFIG_TO_HIVE.get(configKey);
}
@Override
@@ -95,4 +65,9 @@ public class HivePropertiesConverter implements
PropertiesConverter {
properties.put("connector", "hive");
return properties;
}
+
+ @Override
+ public String getFlinkCatalogType() {
+ return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
+ }
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
index 7684d3eadb..1d80e27ea5 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
@@ -19,11 +19,9 @@
package org.apache.gravitino.flink.connector.iceberg;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
@@ -38,38 +36,21 @@ public class IcebergPropertiesConverter implements
PropertiesConverter {
IcebergConstants.CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);
@Override
- public Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
- Preconditions.checkArgument(
- gravitinoProperties != null, "Iceberg Catalog properties should not be
null.");
+ public String transformPropertyToGravitinoCatalog(String configKey) {
+ return
IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
+ }
- Map<String, String> all = new HashMap<>();
- if (gravitinoProperties != null) {
- gravitinoProperties.forEach(
- (k, v) -> {
- if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
- String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
- all.put(newKey, v);
- }
- });
- }
- Map<String, String> transformedProperties =
- IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
+ @Override
+ public String transformPropertyToFlinkCatalog(String configKey) {
- if (transformedProperties != null) {
- all.putAll(transformedProperties);
+ String icebergConfigKey = null;
+ if
(IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(configKey)) {
+ icebergConfigKey =
IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(configKey);
}
- all.put(
- CommonCatalogOptions.CATALOG_TYPE.key(),
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
- // Map "catalog-backend" to "catalog-type".
- // TODO If catalog backend is CUSTOM, we need special compatibility logic.
- GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
- (key, value) -> {
- if (all.containsKey(key)) {
- String config = all.remove(key);
- all.put(value, config);
- }
- });
- return all;
+ if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(configKey)) {
+ icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(configKey);
+ }
+ return icebergConfigKey;
}
@Override
@@ -78,7 +59,7 @@ public class IcebergPropertiesConverter implements
PropertiesConverter {
}
@Override
- public Map<String, String> toFlinkTableProperties(Map<String, String>
properties) {
- return new HashMap<>(properties);
+ public String getFlinkCatalogType() {
+ return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
}
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
index 58613bee37..99e402bcb8 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
@@ -19,15 +19,9 @@
package org.apache.gravitino.flink.connector.paimon;
-import com.google.common.collect.Maps;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
-import org.apache.paimon.catalog.FileSystemCatalogFactory;
public class PaimonPropertiesConverter implements PropertiesConverter {
@@ -36,45 +30,23 @@ public class PaimonPropertiesConverter implements
PropertiesConverter {
private PaimonPropertiesConverter() {}
@Override
- public Map<String, String> toGravitinoCatalogProperties(Configuration
flinkConf) {
- Map<String, String> gravitinoProperties = Maps.newHashMap();
- Map<String, String> flinkConfMap = flinkConf.toMap();
- for (Map.Entry<String, String> entry : flinkConfMap.entrySet()) {
- String gravitinoKey =
-
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
- if (gravitinoKey != null) {
- gravitinoProperties.put(gravitinoKey, entry.getValue());
- } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
- gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(),
entry.getValue());
- } else {
- gravitinoProperties.put(entry.getKey(), entry.getValue());
- }
+ public String transformPropertyToGravitinoCatalog(String configKey) {
+ if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) {
+ return PaimonConstants.CATALOG_BACKEND;
}
- gravitinoProperties.put(
- PaimonConstants.CATALOG_BACKEND,
- flinkConfMap.getOrDefault(PaimonConstants.METASTORE,
FileSystemCatalogFactory.IDENTIFIER));
- return gravitinoProperties;
+ return
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}
@Override
- public Map<String, String> toFlinkCatalogProperties(Map<String, String>
gravitinoProperties) {
- Map<String, String> all = new HashMap<>();
- gravitinoProperties.forEach(
- (key, value) -> {
- String flinkConfigKey = key;
- if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
- flinkConfigKey =
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
- }
- all.put(flinkConfigKey, value);
- });
- Map<String, String> paimonCatalogProperties =
- PaimonPropertiesUtils.toPaimonCatalogProperties(all);
- paimonCatalogProperties.put(
- PaimonConstants.METASTORE,
- paimonCatalogProperties.getOrDefault(
- PaimonConstants.CATALOG_BACKEND,
FileSystemCatalogFactory.IDENTIFIER));
- paimonCatalogProperties.put(
- CommonCatalogOptions.CATALOG_TYPE.key(),
GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
- return paimonCatalogProperties;
+ public String transformPropertyToFlinkCatalog(String configKey) {
+ if (configKey.equals(PaimonConstants.CATALOG_BACKEND)) {
+ return PaimonConstants.METASTORE;
+ }
+ return PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.get(configKey);
+ }
+
+ @Override
+ public String getFlinkCatalogType() {
+ return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
}
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
index d6de522f39..8287eebf25 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.flink.connector.iceberg;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -31,16 +32,37 @@ public class TestIcebergPropertiesConverter {
@Test
void testCatalogPropertiesWithHiveBackend() {
Map<String, String> properties =
- CONVERTER.toFlinkCatalogProperties(
- ImmutableMap.of(
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
-
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
- "hive-uri",
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
- "hive-warehouse",
- "key1",
- "value1"));
+ CONVERTER.toGravitinoCatalogProperties(
+ Configuration.fromMap(
+ ImmutableMap.of(
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+ "hive-uri",
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+ "hive-warehouse",
+ "key1",
+ "value1",
+ "flink.bypass.key2",
+ "value2")));
+
+ Map<String, String> actual =
+ ImmutableMap.of(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+ "hive-uri",
+ IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+ "hive-warehouse",
+ "flink.bypass.key1", // Automatically add 'flink.bypass.'
+ "value1",
+ "flink.bypass.key2",
+ "value2");
+
+ Assertions.assertEquals(actual, properties);
+
+ Map<String, String> toFlinkProperties =
CONVERTER.toFlinkCatalogProperties(actual);
+
Assertions.assertEquals(
ImmutableMap.of(
CommonCatalogOptions.CATALOG_TYPE.key(),
@@ -50,23 +72,48 @@ public class TestIcebergPropertiesConverter {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"hive-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
- "hive-warehouse"),
- properties);
+ "hive-warehouse",
+ "key1", // When returning to Flink, prefix 'flink.bypass.'
Automatically removed.
+ "value1",
+ "key2", // When returning to Flink, prefix 'flink.bypass.'
Automatically removed.
+ "value2"),
+ toFlinkProperties);
}
@Test
void testCatalogPropertiesWithRestBackend() {
Map<String, String> properties =
- CONVERTER.toFlinkCatalogProperties(
- ImmutableMap.of(
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
- IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
- "rest-uri",
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
- "rest-warehouse",
- "key1",
- "value1"));
+ CONVERTER.toGravitinoCatalogProperties(
+ Configuration.fromMap(
+ ImmutableMap.of(
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+ "rest-uri",
+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+ "rest-warehouse",
+ "key1",
+ "value1",
+ "flink.bypass.key2",
+ "value2")));
+
+ Map<String, String> actual =
+ ImmutableMap.of(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+ "rest-uri",
+ IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+ "rest-warehouse",
+ "flink.bypass.key1", // Automatically add 'flink.bypass.'
+ "value1",
+ "flink.bypass.key2",
+ "value2");
+
+ Assertions.assertEquals(actual, properties);
+
+ Map<String, String> toFlinkProperties =
CONVERTER.toFlinkCatalogProperties(actual);
+
Assertions.assertEquals(
ImmutableMap.of(
CommonCatalogOptions.CATALOG_TYPE.key(),
@@ -76,7 +123,11 @@ public class TestIcebergPropertiesConverter {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
- "rest-warehouse"),
- properties);
+ "rest-warehouse",
+ "key1", // When returning to Flink, prefix 'flink.bypass.'
Automatically removed.
+ "value1",
+ "key2", // When returning to Flink, prefix 'flink.bypass.'
Automatically removed.
+ "value2"),
+ toFlinkProperties);
}
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
index 0834def90b..cedf0e8d59 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
@@ -157,10 +157,6 @@ public abstract class FlinkIcebergCatalogIT extends
FlinkCommonIT {
Map<String, String> properties = gravitinoCatalog.properties();
Assertions.assertEquals(hiveMetastoreUri,
properties.get(IcebergConstants.URI));
- Assertions.assertEquals(
- GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
- properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
-
// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog =
tableEnv.getCatalog(catalogName);
Assertions.assertTrue(catalog.isPresent());
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index a03b4a198e..66458ba8e7 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -98,7 +98,7 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
"create catalog %s with ("
+ "'type'='gravitino-paimon', "
+ "'warehouse'='%s',"
- + "'catalog.backend'='filesystem'"
+ + "'metastore'='filesystem'"
+ ")",
catalogName, warehouse));
String[] catalogs = tableEnv.listCatalogs();