This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 792ded1162 [#6302] refactor:Optimize Flink connector properties 
converter  (#6303)
792ded1162 is described below

commit 792ded11624e53076b6a6b81a3356a35abbceb8b
Author: Xiaojian Sun <[email protected]>
AuthorDate: Fri Jan 24 08:30:11 2025 +0800

    [#6302] refactor:Optimize Flink connector properties converter  (#6303)
    
    ### What changes were proposed in this pull request?
    
    - Unified params format, add flink.bypass. prefix, especially Iceberg.
    - Extract the flink.bypass. logic to the PropertiesConverter, and the
    extension logic does not care about this operation
    
    ### Why are the changes needed?
    
    Fix: [#(6302)](https://github.com/apache/gravitino/issues/6302)
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    N/A
---
 .../lakehouse/iceberg/IcebergPropertiesUtils.java  |  9 ++
 .../flink/connector/PropertiesConverter.java       | 83 ++++++++++++++++--
 .../connector/hive/HivePropertiesConverter.java    | 43 ++--------
 .../iceberg/IcebergPropertiesConverter.java        | 47 +++-------
 .../paimon/PaimonPropertiesConverter.java          | 56 +++---------
 .../iceberg/TestIcebergPropertiesConverter.java    | 99 ++++++++++++++++------
 .../test/iceberg/FlinkIcebergCatalogIT.java        |  4 -
 .../test/paimon/FlinkPaimonCatalogIT.java          |  2 +-
 8 files changed, 197 insertions(+), 146 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
index df1340c947..92c5d18a12 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
@@ -33,6 +33,7 @@ public class IcebergPropertiesUtils {
   // will only need to set the configuration 'catalog-backend' in Gravitino 
and Gravitino will
   // change it to `catalogType` automatically and pass it to Iceberg.
   public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
+  public static final Map<String, String> ICEBERG_CATALOG_CONFIG_TO_GRAVITINO;
 
   static {
     Map<String, String> map = new HashMap();
@@ -65,6 +66,14 @@ public class IcebergPropertiesUtils {
         AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
         IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY);
     GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);
+
+    Map<String, String> icebergCatalogConfigToGravitino = new HashMap<>();
+    map.forEach(
+        (key, value) -> {
+          icebergCatalogConfigToGravitino.put(value, key);
+        });
+    ICEBERG_CATALOG_CONFIG_TO_GRAVITINO =
+        Collections.unmodifiableMap(icebergCatalogConfigToGravitino);
   }
 
   /**
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
index c9fbb8a491..15d1a12fa3 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java
@@ -19,8 +19,10 @@
 
 package org.apache.gravitino.flink.connector;
 
+import com.google.common.collect.Maps;
 import java.util.Map;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 
 /**
  * PropertiesConverter is used to convert properties between Flink properties 
and Apache Gravitino
@@ -32,25 +34,82 @@ public interface PropertiesConverter {
 
   /**
    * Converts properties from application provided properties and Flink 
connector properties to
-   * Gravitino properties.
+   * Gravitino properties.This method processes the Flink configuration and 
transforms it into a
+   * format suitable for the Gravitino catalog.
    *
-   * @param flinkConf The configuration provided by Flink.
-   * @return properties for the Gravitino catalog.
+   * @param flinkConf The Flink configuration containing connector properties. 
This includes both
+   *     Flink-specific properties and any user-provided properties.
+   * @return A map of properties converted for use in the Gravitino catalog. 
The returned map
+   *     includes both directly transformed properties and bypass properties 
prefixed with {@link
+   *     #FLINK_PROPERTY_PREFIX}.
    */
   default Map<String, String> toGravitinoCatalogProperties(Configuration 
flinkConf) {
-    return flinkConf.toMap();
+    Map<String, String> gravitinoProperties = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
+      String gravitinoKey = 
transformPropertyToGravitinoCatalog(entry.getKey());
+      if (gravitinoKey != null) {
+        gravitinoProperties.put(gravitinoKey, entry.getValue());
+      } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
+        gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), 
entry.getValue());
+      } else {
+        gravitinoProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return gravitinoProperties;
   }
 
   /**
-   * Converts properties from Gravitino properties to Flink connector 
properties.
+   * Converts properties from Gravitino catalog properties to Flink connector 
properties. This
+   * method processes the Gravitino properties and transforms them into a 
format suitable for the
+   * Flink connector.
    *
-   * @param gravitinoProperties The properties provided by Gravitino.
-   * @return properties for the Flink connector.
+   * @param gravitinoProperties The properties provided by the Gravitino 
catalog. This includes both
+   *     Gravitino-specific properties and any bypass properties prefixed with 
{@link
+   *     #FLINK_PROPERTY_PREFIX}.
+   * @return A map of properties converted for use in the Flink connector. The 
returned map includes
+   *     both transformed properties and the Flink catalog type.
    */
   default Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    return gravitinoProperties;
+    Map<String, String> allProperties = Maps.newHashMap();
+    gravitinoProperties.forEach(
+        (key, value) -> {
+          String flinkConfigKey = key;
+          if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
+            flinkConfigKey = 
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
+            allProperties.put(flinkConfigKey, value);
+          } else {
+            String convertedKey = 
transformPropertyToFlinkCatalog(flinkConfigKey);
+            if (convertedKey != null) {
+              allProperties.put(convertedKey, value);
+            }
+          }
+        });
+    allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
getFlinkCatalogType());
+    return allProperties;
   }
 
+  /**
+   * Transforms a Flink configuration key to a corresponding Gravitino catalog 
property key. This
+   * method is used to map Flink-specific configuration keys to Gravitino 
catalog properties.
+   *
+   * @param configKey The Flink configuration key to be transformed.
+   * @return The corresponding Gravitino catalog property key, or {@code null} 
if no transformation
+   *     is needed.
+   */
+  String transformPropertyToGravitinoCatalog(String configKey);
+
+  /**
+   * Transforms a specific configuration key from Gravitino catalog properties 
to Flink connector
+   * properties. This method is used to convert a property key that is 
specific to Gravitino into a
+   * format that can be understood by the Flink connector.
+   *
+   * @param configKey The configuration key from Gravitino catalog properties 
to be transformed.
+   * @return The transformed configuration key that is compatible with the 
Flink connector.
+   * @throws IllegalArgumentException If the provided configuration key cannot 
be transformed or is
+   *     invalid.
+   */
+  String transformPropertyToFlinkCatalog(String configKey);
+
   /**
    * Converts properties from Flink connector schema properties to Gravitino 
schema properties.
    *
@@ -90,4 +149,12 @@ public interface PropertiesConverter {
   default Map<String, String> toGravitinoTableProperties(Map<String, String> 
flinkProperties) {
     return flinkProperties;
   }
+
+  /**
+   * Retrieves the Flink catalog type associated with this converter. This 
method is used to
+   * determine the type of Flink catalog that this converter is designed for.
+   *
+   * @return The Flink catalog type.
+   */
+  String getFlinkCatalogType();
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
index 1809586749..20a3e8cf62 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
@@ -20,11 +20,8 @@
 package org.apache.gravitino.flink.connector.hive;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.gravitino.catalog.hive.HiveConstants;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,46 +31,19 @@ public class HivePropertiesConverter implements 
PropertiesConverter {
   private HivePropertiesConverter() {}
 
   public static final HivePropertiesConverter INSTANCE = new 
HivePropertiesConverter();
-
   private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
       ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, 
HiveConstants.METASTORE_URIS);
   private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
       ImmutableMap.of(HiveConstants.METASTORE_URIS, 
HiveConf.ConfVars.METASTOREURIS.varname);
 
   @Override
-  public Map<String, String> toGravitinoCatalogProperties(Configuration 
flinkConf) {
-    Map<String, String> gravitinoProperties = Maps.newHashMap();
-
-    for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
-      String gravitinoKey = 
HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
-      if (gravitinoKey != null) {
-        gravitinoProperties.put(gravitinoKey, entry.getValue());
-      } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
-        gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), 
entry.getValue());
-      } else {
-        gravitinoProperties.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    return gravitinoProperties;
+  public String transformPropertyToGravitinoCatalog(String configKey) {
+    return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
   }
 
   @Override
-  public Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    Map<String, String> flinkCatalogProperties = Maps.newHashMap();
-    flinkCatalogProperties.put(
-        CommonCatalogOptions.CATALOG_TYPE.key(), 
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
-
-    gravitinoProperties.forEach(
-        (key, value) -> {
-          String flinkConfigKey = key;
-          if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
-            flinkConfigKey = 
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
-          }
-          flinkCatalogProperties.put(
-              GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, 
flinkConfigKey), value);
-        });
-    return flinkCatalogProperties;
+  public String transformPropertyToFlinkCatalog(String configKey) {
+    return GRAVITINO_CONFIG_TO_HIVE.get(configKey);
   }
 
   @Override
@@ -95,4 +65,9 @@ public class HivePropertiesConverter implements 
PropertiesConverter {
     properties.put("connector", "hive");
     return properties;
   }
+
+  @Override
+  public String getFlinkCatalogType() {
+    return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
+  }
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
index 7684d3eadb..1d80e27ea5 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
@@ -19,11 +19,9 @@
 
 package org.apache.gravitino.flink.connector.iceberg;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
@@ -38,38 +36,21 @@ public class IcebergPropertiesConverter implements 
PropertiesConverter {
           IcebergConstants.CATALOG_BACKEND, 
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);
 
   @Override
-  public Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    Preconditions.checkArgument(
-        gravitinoProperties != null, "Iceberg Catalog properties should not be 
null.");
+  public String transformPropertyToGravitinoCatalog(String configKey) {
+    return 
IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
+  }
 
-    Map<String, String> all = new HashMap<>();
-    if (gravitinoProperties != null) {
-      gravitinoProperties.forEach(
-          (k, v) -> {
-            if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
-              String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
-              all.put(newKey, v);
-            }
-          });
-    }
-    Map<String, String> transformedProperties =
-        IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
+  @Override
+  public String transformPropertyToFlinkCatalog(String configKey) {
 
-    if (transformedProperties != null) {
-      all.putAll(transformedProperties);
+    String icebergConfigKey = null;
+    if 
(IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(configKey)) {
+      icebergConfigKey = 
IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(configKey);
     }
-    all.put(
-        CommonCatalogOptions.CATALOG_TYPE.key(), 
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
-    // Map "catalog-backend" to "catalog-type".
-    // TODO If catalog backend is CUSTOM, we need special compatibility logic.
-    GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
-        (key, value) -> {
-          if (all.containsKey(key)) {
-            String config = all.remove(key);
-            all.put(value, config);
-          }
-        });
-    return all;
+    if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(configKey)) {
+      icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(configKey);
+    }
+    return icebergConfigKey;
   }
 
   @Override
@@ -78,7 +59,7 @@ public class IcebergPropertiesConverter implements 
PropertiesConverter {
   }
 
   @Override
-  public Map<String, String> toFlinkTableProperties(Map<String, String> 
properties) {
-    return new HashMap<>(properties);
+  public String getFlinkCatalogType() {
+    return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
   }
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
index 58613bee37..99e402bcb8 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java
@@ -19,15 +19,9 @@
 
 package org.apache.gravitino.flink.connector.paimon;
 
-import com.google.common.collect.Maps;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
-import org.apache.paimon.catalog.FileSystemCatalogFactory;
 
 public class PaimonPropertiesConverter implements PropertiesConverter {
 
@@ -36,45 +30,23 @@ public class PaimonPropertiesConverter implements 
PropertiesConverter {
   private PaimonPropertiesConverter() {}
 
   @Override
-  public Map<String, String> toGravitinoCatalogProperties(Configuration 
flinkConf) {
-    Map<String, String> gravitinoProperties = Maps.newHashMap();
-    Map<String, String> flinkConfMap = flinkConf.toMap();
-    for (Map.Entry<String, String> entry : flinkConfMap.entrySet()) {
-      String gravitinoKey =
-          
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
-      if (gravitinoKey != null) {
-        gravitinoProperties.put(gravitinoKey, entry.getValue());
-      } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
-        gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), 
entry.getValue());
-      } else {
-        gravitinoProperties.put(entry.getKey(), entry.getValue());
-      }
+  public String transformPropertyToGravitinoCatalog(String configKey) {
+    if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) {
+      return PaimonConstants.CATALOG_BACKEND;
     }
-    gravitinoProperties.put(
-        PaimonConstants.CATALOG_BACKEND,
-        flinkConfMap.getOrDefault(PaimonConstants.METASTORE, 
FileSystemCatalogFactory.IDENTIFIER));
-    return gravitinoProperties;
+    return 
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
   }
 
   @Override
-  public Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
-    Map<String, String> all = new HashMap<>();
-    gravitinoProperties.forEach(
-        (key, value) -> {
-          String flinkConfigKey = key;
-          if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
-            flinkConfigKey = 
key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
-          }
-          all.put(flinkConfigKey, value);
-        });
-    Map<String, String> paimonCatalogProperties =
-        PaimonPropertiesUtils.toPaimonCatalogProperties(all);
-    paimonCatalogProperties.put(
-        PaimonConstants.METASTORE,
-        paimonCatalogProperties.getOrDefault(
-            PaimonConstants.CATALOG_BACKEND, 
FileSystemCatalogFactory.IDENTIFIER));
-    paimonCatalogProperties.put(
-        CommonCatalogOptions.CATALOG_TYPE.key(), 
GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
-    return paimonCatalogProperties;
+  public String transformPropertyToFlinkCatalog(String configKey) {
+    if (configKey.equals(PaimonConstants.CATALOG_BACKEND)) {
+      return PaimonConstants.METASTORE;
+    }
+    return PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.get(configKey);
+  }
+
+  @Override
+  public String getFlinkCatalogType() {
+    return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
   }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
index d6de522f39..8287eebf25 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.flink.connector.iceberg;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -31,16 +32,37 @@ public class TestIcebergPropertiesConverter {
   @Test
   void testCatalogPropertiesWithHiveBackend() {
     Map<String, String> properties =
-        CONVERTER.toFlinkCatalogProperties(
-            ImmutableMap.of(
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
-                
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
-                "hive-uri",
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
-                "hive-warehouse",
-                "key1",
-                "value1"));
+        CONVERTER.toGravitinoCatalogProperties(
+            Configuration.fromMap(
+                ImmutableMap.of(
+                    
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+                    
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+                    IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+                    "hive-uri",
+                    
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+                    "hive-warehouse",
+                    "key1",
+                    "value1",
+                    "flink.bypass.key2",
+                    "value2")));
+
+    Map<String, String> actual =
+        ImmutableMap.of(
+            IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+            IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+            "hive-uri",
+            IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+            "hive-warehouse",
+            "flink.bypass.key1", // Automatically add 'flink.bypass.'
+            "value1",
+            "flink.bypass.key2",
+            "value2");
+
+    Assertions.assertEquals(actual, properties);
+
+    Map<String, String> toFlinkProperties = 
CONVERTER.toFlinkCatalogProperties(actual);
+
     Assertions.assertEquals(
         ImmutableMap.of(
             CommonCatalogOptions.CATALOG_TYPE.key(),
@@ -50,23 +72,48 @@ public class TestIcebergPropertiesConverter {
             IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
             "hive-uri",
             IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
-            "hive-warehouse"),
-        properties);
+            "hive-warehouse",
+            "key1", // When returning to Flink, prefix 'flink.bypass.' 
Automatically removed.
+            "value1",
+            "key2", // When returning to Flink, prefix 'flink.bypass.' 
Automatically removed.
+            "value2"),
+        toFlinkProperties);
   }
 
   @Test
   void testCatalogPropertiesWithRestBackend() {
     Map<String, String> properties =
-        CONVERTER.toFlinkCatalogProperties(
-            ImmutableMap.of(
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
-                IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
-                "rest-uri",
-                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
-                "rest-warehouse",
-                "key1",
-                "value1"));
+        CONVERTER.toGravitinoCatalogProperties(
+            Configuration.fromMap(
+                ImmutableMap.of(
+                    
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+                    IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+                    IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+                    "rest-uri",
+                    
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+                    "rest-warehouse",
+                    "key1",
+                    "value1",
+                    "flink.bypass.key2",
+                    "value2")));
+
+    Map<String, String> actual =
+        ImmutableMap.of(
+            IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+            "rest-uri",
+            IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+            "rest-warehouse",
+            "flink.bypass.key1", // Automatically add 'flink.bypass.'
+            "value1",
+            "flink.bypass.key2",
+            "value2");
+
+    Assertions.assertEquals(actual, properties);
+
+    Map<String, String> toFlinkProperties = 
CONVERTER.toFlinkCatalogProperties(actual);
+
     Assertions.assertEquals(
         ImmutableMap.of(
             CommonCatalogOptions.CATALOG_TYPE.key(),
@@ -76,7 +123,11 @@ public class TestIcebergPropertiesConverter {
             IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
             "rest-uri",
             IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
-            "rest-warehouse"),
-        properties);
+            "rest-warehouse",
+            "key1", // When returning to Flink, prefix 'flink.bypass.' 
Automatically removed.
+            "value1",
+            "key2", // When returning to Flink, prefix 'flink.bypass.' 
Automatically removed.
+            "value2"),
+        toFlinkProperties);
   }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
index 0834def90b..cedf0e8d59 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
@@ -157,10 +157,6 @@ public abstract class FlinkIcebergCatalogIT extends 
FlinkCommonIT {
     Map<String, String> properties = gravitinoCatalog.properties();
     Assertions.assertEquals(hiveMetastoreUri, 
properties.get(IcebergConstants.URI));
 
-    Assertions.assertEquals(
-        GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
-        properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
-
     // Get the created catalog.
     Optional<org.apache.flink.table.catalog.Catalog> catalog = 
tableEnv.getCatalog(catalogName);
     Assertions.assertTrue(catalog.isPresent());
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index a03b4a198e..66458ba8e7 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -98,7 +98,7 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
             "create catalog %s with ("
                 + "'type'='gravitino-paimon', "
                 + "'warehouse'='%s',"
-                + "'catalog.backend'='filesystem'"
+                + "'metastore'='filesystem'"
                 + ")",
             catalogName, warehouse));
     String[] catalogs = tableEnv.listCatalogs();

Reply via email to