This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 10cc598648 [#7469] improvement(trino-connector): Support catalog 
configuration directly pass through (#7562)
10cc598648 is described below

commit 10cc598648fcd52f82bf5fb9d8d53ae174bcbbf6
Author: qbhan <[email protected]>
AuthorDate: Fri Jul 4 15:22:23 2025 +0800

    [#7469] improvement(trino-connector): Support catalog configuration 
directly pass through (#7562)
    
    ### What changes were proposed in this pull request?
    Support directly pass catalog configuration to the Gravitino catalog in
    the Trino runtime
    
    ### Why are the changes needed?
    
    PropertyConverter will skip property which not in
    `PropertyConverter#engineToGravitinoMapping`, it increases coupling with
    Trino. We will remove the original logic in the catalog that maps
    `trino.bypass.` keys using the TRINO_KEY_TO_GRAVITINO_KEY map. Only
    remove the `trino.bypass.` prefix from the configuration key​ and
    directly pass it through.
    
    FIx: #7469
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    local tests
---
 .../catalog/property/PropertyConverter.java        |   5 +-
 .../catalog/CatalogPropertyConverter.java          |  65 +++++
 .../catalog/hive/HiveCatalogPropertyConverter.java | 243 -------------------
 .../catalog/hive/HiveConnectorAdapter.java         |   3 +-
 .../iceberg/IcebergCatalogPropertyConverter.java   | 270 +--------------------
 .../catalog/jdbc/JDBCCatalogPropertyConverter.java |  65 +----
 .../catalog/memory/MemoryConnectorAdapter.java     |   6 +-
 .../trino/connector/GravitinoMockServer.java       |  12 +-
 .../trino/connector/TestGravitinoConnector.java    |  14 ++
 .../hive/TestHiveCatalogPropertyConverter.java     |  13 +-
 .../TestIcebergCatalogPropertyConverter.java       |  12 +-
 .../mysql/TestMySQLCatalogPropertyConverter.java   |   6 +-
 .../TestPostgreSQLCatalogPropertyConverter.java    |   6 +-
 13 files changed, 119 insertions(+), 601 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/property/PropertyConverter.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/property/PropertyConverter.java
index 75985ff573..8b6325425e 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/property/PropertyConverter.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/property/PropertyConverter.java
@@ -27,9 +27,8 @@ import org.slf4j.LoggerFactory;
 /** Transforming between Apache Gravitino schema/table/column property and 
engine property. */
 public abstract class PropertyConverter {
 
-  protected static final String TRINO_PROPERTIES_PREFIX = "trino.bypass.";
-
   private static final Logger LOG = 
LoggerFactory.getLogger(PropertyConverter.class);
+
   /**
    * Mapping that maps engine properties to Gravitino properties. It will 
return a map that holds
    * the mapping between engine and Gravitino properties.
@@ -38,7 +37,7 @@ public abstract class PropertyConverter {
    */
   public abstract Map<String, String> engineToGravitinoMapping();
 
-  Map<String, String> reverseMap(Map<String, String> map) {
+  public Map<String, String> reverseMap(Map<String, String> map) {
     Map<String, String> res = new HashMap<>();
     for (Map.Entry<String, String> entry : map.entrySet()) {
       res.put(entry.getValue(), entry.getKey());
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogPropertyConverter.java
new file mode 100644
index 0000000000..f15e305dc8
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogPropertyConverter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.trino.connector.catalog;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.property.PropertyConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogPropertyConverter extends PropertyConverter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PropertyConverter.class);
+
+  private static final String TRINO_PROPERTIES_PREFIX = "trino.bypass.";
+
+  @Override
+  public Map<String, String> engineToGravitinoMapping() {
+    return ImmutableMap.of();
+  }
+
+  /**
+   * Convert Gravitino properties to engine properties. Support skip 
validation and directly pass
+   * some config through
+   *
+   * @param gravitinoProperties map of Gravitino properties
+   * @return map of engine properties
+   */
+  @Override
+  public Map<String, String> gravitinoToEngineProperties(Map<String, String> 
gravitinoProperties) {
+    Map<String, String> engineProperties = new HashMap<>();
+    Map<String, String> gravitinoToEngineMapping = 
reverseMap(engineToGravitinoMapping());
+    for (Map.Entry<String, String> entry : gravitinoProperties.entrySet()) {
+      String gravitinoKey = entry.getKey();
+      if (gravitinoKey.startsWith(TRINO_PROPERTIES_PREFIX)) {
+        engineProperties.put(gravitinoKey.replace(TRINO_PROPERTIES_PREFIX, 
""), entry.getValue());
+        continue;
+      }
+      String engineKey = gravitinoToEngineMapping.get(gravitinoKey);
+      if (engineKey != null) {
+        engineProperties.put(engineKey, entry.getValue());
+      } else {
+        LOG.info("Property {} is not supported by engine", entry.getKey());
+      }
+    }
+    return engineProperties;
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveCatalogPropertyConverter.java
deleted file mode 100644
index d4be9cdfc3..0000000000
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveCatalogPropertyConverter.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.trino.connector.catalog.hive;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.collections4.bidimap.TreeBidiMap;
-import org.apache.gravitino.catalog.property.PropertyConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Convert Apache Hive properties between Trino and Apache Gravitino. */
-public class HiveCatalogPropertyConverter extends PropertyConverter {
-
-  /** Logger for Hive catalog between Trino and Gravitino. */
-  public static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogPropertyConverter.class);
-
-  private static final TreeBidiMap<String, String> TRINO_KEY_TO_GRAVITINO_KEY =
-      new TreeBidiMap<>(
-          new ImmutableMap.Builder<String, String>()
-              // Key is the Trino property, value is the Gravitino property
-              // General configuration
-              .put("hive.config.resources", TRINO_PROPERTIES_PREFIX + 
"hive.config.resources")
-              .put(
-                  "hive.recursive-directories",
-                  TRINO_PROPERTIES_PREFIX + "hive.recursive-directories")
-              .put(
-                  "hive.ignore-absent-partitions",
-                  TRINO_PROPERTIES_PREFIX + "hive.ignore-absent-partitions")
-              .put("hive.storage-format", TRINO_PROPERTIES_PREFIX + 
"hive.storage-format")
-              .put("hive.compression-codec", TRINO_PROPERTIES_PREFIX + 
"hive.compression-codec")
-              .put(
-                  "hive.force-local-scheduling",
-                  TRINO_PROPERTIES_PREFIX + "hive.force-local-scheduling")
-              .put(
-                  "hive.respect-table-format",
-                  TRINO_PROPERTIES_PREFIX + "hive.respect-table-format")
-              .put(
-                  "hive.immutable-partitions",
-                  TRINO_PROPERTIES_PREFIX + "hive.immutable-partitions")
-              .put(
-                  "hive.insert-existing-partitions-behavior",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.insert-existing-partitions-behavior")
-              .put(
-                  "hive.target-max-file-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.target-max-file-size")
-              .put(
-                  "hive.create-empty-bucket-files",
-                  TRINO_PROPERTIES_PREFIX + "hive.create-empty-bucket-files")
-              .put("hive.validate-bucketing", TRINO_PROPERTIES_PREFIX + 
"hive.validate-bucketing")
-              .put(
-                  "hive.partition-statistics-sample-size",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.partition-statistics-sample-size")
-              .put(
-                  "hive.max-partitions-per-writers",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-partitions-per-writers")
-              .put(
-                  "hive.max-partitions-for-eager-load",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.max-partitions-for-eager-load")
-              .put(
-                  "hive.max-partitions-per-scan",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-partitions-per-scan")
-              .put("hive.dfs.replication", TRINO_PROPERTIES_PREFIX + 
"hive.dfs.replication")
-              .put("hive.security", TRINO_PROPERTIES_PREFIX + "hive.security")
-              .put("security.config-file", TRINO_PROPERTIES_PREFIX + 
"security.config-file")
-              .put(
-                  "hive.non-managed-table-writes-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.non-managed-table-writes-enabled")
-              .put(
-                  "hive.non-managed-table-creates-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.non-managed-table-creates-enabled")
-              .put(
-                  "hive.collect-column-statistics-on-write",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.collect-column-statistics-on-write")
-              .put(
-                  "hive.file-status-cache-tables",
-                  TRINO_PROPERTIES_PREFIX + "hive.file-status-cache-tables")
-              .put(
-                  "hive.file-status-cache.max-retained-size",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.file-status-cache.max-retained-size")
-              .put(
-                  "hive.file-status-cache-expire-time",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.file-status-cache-expire-time")
-              .put(
-                  "hive.per-transaction-file-status-cache.max-retained-size",
-                  TRINO_PROPERTIES_PREFIX
-                      + 
"hive.per-transaction-file-status-cache.max-retained-size")
-              .put("hive.timestamp-precision", TRINO_PROPERTIES_PREFIX + 
"hive.timestamp-precision")
-              .put(
-                  "hive.temporary-staging-directory-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.temporary-staging-directory-enabled")
-              .put(
-                  "hive.temporary-staging-directory-path",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.temporary-staging-directory-path")
-              .put("hive.hive-views.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.hive-views.enabled")
-              .put(
-                  "hive.hive-views.legacy-translation",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.hive-views.legacy-translation")
-              .put(
-                  "hive.parallel-partitioned-bucketed-writes",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.parallel-partitioned-bucketed-writes")
-              .put(
-                  "hive.fs.new-directory-permissions",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.fs.new-directory-permissions")
-              .put("hive.fs.cache.max-size", TRINO_PROPERTIES_PREFIX + 
"hive.fs.cache.max-size")
-              .put(
-                  "hive.query-partition-filter-required",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.query-partition-filter-required")
-              .put(
-                  "hive.table-statistics-enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.table-statistics-enabled")
-              .put("hive.auto-purge", TRINO_PROPERTIES_PREFIX + 
"hive.auto-purge")
-              .put(
-                  "hive.partition-projection-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.partition-projection-enabled")
-              .put(
-                  "hive.max-partition-drops-per-query",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.max-partition-drops-per-query")
-              .put(
-                  "hive.single-statement-writes",
-                  TRINO_PROPERTIES_PREFIX + "hive.single-statement-writes")
-
-              // Performance
-              .put(
-                  "hive.max-outstanding-splits",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-outstanding-splits")
-              .put(
-                  "hive.max-outstanding-splits-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-outstanding-splits-size")
-              .put(
-                  "hive.max-splits-per-second",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-splits-per-second")
-              .put("hive.max-initial-splits", TRINO_PROPERTIES_PREFIX + 
"hive.max-initial-splits")
-              .put(
-                  "hive.max-initial-split-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-initial-split-size")
-              .put("hive.max-split-size", TRINO_PROPERTIES_PREFIX + 
"hive.max-split-size")
-
-              // S3
-              .put("hive.s3.aws-access-key", TRINO_PROPERTIES_PREFIX + 
"hive.s3.aws-access-key")
-              .put("hive.s3.aws-secret-key", TRINO_PROPERTIES_PREFIX + 
"hive.s3.aws-secret-key")
-              .put("hive.s3.iam-role", TRINO_PROPERTIES_PREFIX + 
"hive.s3.iam-role")
-              .put("hive.s3.external-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.external-id")
-              .put("hive.s3.endpoint", TRINO_PROPERTIES_PREFIX + 
"hive.s3.endpoint")
-              .put("hive.s3.region", TRINO_PROPERTIES_PREFIX + 
"hive.s3.region")
-              .put("hive.s3.storage-class", TRINO_PROPERTIES_PREFIX + 
"hive.s3.storage-class")
-              .put("hive.s3.signer-type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.signer-type")
-              .put("hive.s3.signer-class", TRINO_PROPERTIES_PREFIX + 
"hive.s3.signer-class")
-              .put(
-                  "hive.s3.path-style-access",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.path-style-access")
-              .put(
-                  "hive.s3.staging-directory",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.staging-directory")
-              .put(
-                  "hive.s3.pin-client-to-current-region",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.pin-client-to-current-region")
-              .put("hive.s3.ssl.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.s3.ssl.enabled")
-              .put("hive.s3.sse.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.enabled")
-              .put("hive.s3.sse.type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.type")
-              .put("hive.s3.sse.kms-key-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.kms-key-id")
-              .put("hive.s3.kms-key-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.kms-key-id")
-              .put(
-                  "hive.s3.encryption-materials-provider",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.encryption-materials-provider")
-              .put("hive.s3.upload-acl-type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.upload-acl-type")
-              .put(
-                  "hive.s3.skip-glacier-objects",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.skip-glacier-objects")
-              .put(
-                  "hive.s3.streaming.enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.streaming.enabled")
-              .put(
-                  "hive.s3.streaming.part-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.streaming.part-size")
-              .put("hive.s3.proxy.host", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.host")
-              .put("hive.s3.proxy.port", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.port")
-              .put("hive.s3.proxy.protocol", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.protocol")
-              .put(
-                  "hive.s3.proxy.non-proxy-hosts",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.proxy.non-proxy-hosts")
-              .put("hive.s3.proxy.username", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.username")
-              .put("hive.s3.proxy.password", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.password")
-              .put(
-                  "hive.s3.proxy.preemptive-basic-auth",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.preemptive-basic-auth")
-              .put("hive.s3.sts.endpoint", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sts.endpoint")
-              .put("hive.s3.sts.region", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sts.region")
-
-              // Hive metastore Thrift service authentication
-              .put(
-                  "hive.metastore.authentication.type",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.metastore.authentication.type")
-              .put(
-                  "hive.metastore.thrift.impersonation.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.metastore.thrift.impersonation.enabled")
-              .put(
-                  "hive.metastore.service.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.service.principal")
-              .put(
-                  "hive.metastore.client.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.client.principal")
-              .put(
-                  "hive.metastore.client.keytab",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.client.keytab")
-
-              // HDFS authentication
-              .put(
-                  "hive.hdfs.authentication.type",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.authentication.type")
-              .put(
-                  "hive.hdfs.impersonation.enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.impersonation.enabled")
-              .put(
-                  "hive.hdfs.trino.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.trino.principal")
-              .put("hive.hdfs.trino.keytab", TRINO_PROPERTIES_PREFIX + 
"hive.hdfs.trino.keytab")
-              .put(
-                  "hive.hdfs.wire-encryption.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.hdfs.wire-encryption.enabled")
-              .build());
-
-  @Override
-  public TreeBidiMap<String, String> engineToGravitinoMapping() {
-    return TRINO_KEY_TO_GRAVITINO_KEY;
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
index 26deeba338..1e9ecec73d 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.gravitino.catalog.property.PropertyConverter;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 
@@ -41,7 +42,7 @@ public class HiveConnectorAdapter implements 
CatalogConnectorAdapter {
   /** Constructs a new HiveConnectorAdapter. */
   public HiveConnectorAdapter() {
     this.propertyMetadata = new HivePropertyMeta();
-    this.catalogConverter = new HiveCatalogPropertyConverter();
+    this.catalogConverter = new CatalogPropertyConverter();
   }
 
   @Override
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
index 20a66b9257..aabf2e60f3 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergCatalogPropertyConverter.java
@@ -19,294 +19,28 @@
 
 package org.apache.gravitino.trino.connector.catalog.iceberg;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import io.trino.spi.TrinoException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.collections4.bidimap.TreeBidiMap;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
-import org.apache.gravitino.catalog.property.PropertyConverter;
 import org.apache.gravitino.trino.connector.GravitinoErrorCode;
+import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 
 /**
  * A property converter for Iceberg catalogs that handles the conversion 
between Trino and Gravitino
  * property formats. This converter manages various Iceberg-specific 
configurations including
  * general settings, Hive integration, and S3 storage options.
  */
-public class IcebergCatalogPropertyConverter extends PropertyConverter {
-
-  private static final TreeBidiMap<String, String> 
TRINO_ICEBERG_TO_GRAVITINO_ICEBERG =
-      new TreeBidiMap<>(
-          new ImmutableMap.Builder<String, String>()
-              // General configuration
-              .put("iceberg.catalog.type", TRINO_PROPERTIES_PREFIX + 
"iceberg.catalog.type")
-              .put("iceberg.file-format", TRINO_PROPERTIES_PREFIX + 
"iceberg.file-format")
-              .put(
-                  "iceberg.compression-codec",
-                  TRINO_PROPERTIES_PREFIX + "iceberg.compression-codec")
-              .put(
-                  "iceberg.use-file-size-from-metadata",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.use-file-size-from-metadata")
-              .put(
-                  "iceberg.max-partitions-per-writer",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.max-partitions-per-writer")
-              .put(
-                  "iceberg.target-max-file-size",
-                  TRINO_PROPERTIES_PREFIX + "iceberg.target-max-file-size")
-              .put(
-                  "iceberg.unique-table-location",
-                  TRINO_PROPERTIES_PREFIX + "iceberg.unique-table-location")
-              .put(
-                  "iceberg.dynamic-filtering.wait-timeout",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.dynamic-filtering.wait-timeout")
-              .put(
-                  "iceberg.delete-schema-locations-fallback",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.delete-schema-locations-fallback")
-              .put(
-                  "iceberg.minimum-assigned-split-weight",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.minimum-assigned-split-weight")
-              .put(
-                  "iceberg.table-statistics-enabled",
-                  TRINO_PROPERTIES_PREFIX + "iceberg.table-statistics-enabled")
-              .put(
-                  "iceberg.extended-statistics.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.extended-statistics.enabled")
-              .put(
-                  "iceberg.extended-statistics.collect-on-write",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.extended-statistics.collect-on-write")
-              .put(
-                  "iceberg.projection-pushdown-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.projection-pushdown-enabled")
-              .put(
-                  "iceberg.hive-catalog-name",
-                  TRINO_PROPERTIES_PREFIX + "iceberg.hive-catalog-name")
-              .put(
-                  "iceberg.materialized-views.storage-schema",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.materialized-views.storage-schema")
-              .put(
-                  "iceberg.materialized-views.hide-storage-table",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.materialized-views.hide-storage-table")
-              .put(
-                  "iceberg.register-table-procedure.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.register-table-procedure.enabled")
-              .put(
-                  "iceberg.query-partition-filter-required",
-                  TRINO_PROPERTIES_PREFIX + 
"iceberg.query-partition-filter-required")
-
-              // Hive
-              .put("hive.config.resources", TRINO_PROPERTIES_PREFIX + 
"hive.config.resources")
-              .put(
-                  "hive.recursive-directories",
-                  TRINO_PROPERTIES_PREFIX + "hive.recursive-directories")
-              .put(
-                  "hive.ignore-absent-partitions",
-                  TRINO_PROPERTIES_PREFIX + "hive.ignore-absent-partitions")
-              .put("hive.storage-format", TRINO_PROPERTIES_PREFIX + 
"hive.storage-format")
-              .put("hive.compression-codec", TRINO_PROPERTIES_PREFIX + 
"hive.compression-codec")
-              .put(
-                  "hive.force-local-scheduling",
-                  TRINO_PROPERTIES_PREFIX + "hive.force-local-scheduling")
-              .put(
-                  "hive.respect-table-format",
-                  TRINO_PROPERTIES_PREFIX + "hive.respect-table-format")
-              .put(
-                  "hive.immutable-partitions",
-                  TRINO_PROPERTIES_PREFIX + "hive.immutable-partitions")
-              .put(
-                  "hive.insert-existing-partitions-behavior",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.insert-existing-partitions-behavior")
-              .put(
-                  "hive.target-max-file-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.target-max-file-size")
-              .put(
-                  "hive.create-empty-bucket-files",
-                  TRINO_PROPERTIES_PREFIX + "hive.create-empty-bucket-files")
-              .put("hive.validate-bucketing", TRINO_PROPERTIES_PREFIX + 
"hive.validate-bucketing")
-              .put(
-                  "hive.partition-statistics-sample-size",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.partition-statistics-sample-size")
-              .put(
-                  "hive.max-partitions-per-writers",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-partitions-per-writers")
-              .put(
-                  "hive.max-partitions-for-eager-load",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.max-partitions-for-eager-load")
-              .put(
-                  "hive.max-partitions-per-scan",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-partitions-per-scan")
-              .put("hive.dfs.replication", TRINO_PROPERTIES_PREFIX + 
"hive.dfs.replication")
-              .put("hive.security", TRINO_PROPERTIES_PREFIX + "hive.security")
-              .put("security.config-file", TRINO_PROPERTIES_PREFIX + 
"security.config-file")
-              .put(
-                  "hive.non-managed-table-writes-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.non-managed-table-writes-enabled")
-              .put(
-                  "hive.non-managed-table-creates-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.non-managed-table-creates-enabled")
-              .put(
-                  "hive.collect-column-statistics-on-write",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.collect-column-statistics-on-write")
-              .put(
-                  "hive.file-status-cache-tables",
-                  TRINO_PROPERTIES_PREFIX + "hive.file-status-cache-tables")
-              .put(
-                  "hive.file-status-cache.max-retained-size",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.file-status-cache.max-retained-size")
-              .put(
-                  "hive.file-status-cache-expire-time",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.file-status-cache-expire-time")
-              .put(
-                  "hive.per-transaction-file-status-cache.max-retained-size",
-                  TRINO_PROPERTIES_PREFIX
-                      + 
"hive.per-transaction-file-status-cache.max-retained-size")
-              .put("hive.timestamp-precision", TRINO_PROPERTIES_PREFIX + 
"hive.timestamp-precision")
-              .put(
-                  "hive.temporary-staging-directory-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.temporary-staging-directory-enabled")
-              .put(
-                  "hive.temporary-staging-directory-path",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.temporary-staging-directory-path")
-              .put("hive.hive-views.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.hive-views.enabled")
-              .put(
-                  "hive.hive-views.legacy-translation",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.hive-views.legacy-translation")
-              .put(
-                  "hive.parallel-partitioned-bucketed-writes",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.parallel-partitioned-bucketed-writes")
-              .put(
-                  "hive.fs.new-directory-permissions",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.fs.new-directory-permissions")
-              .put("hive.fs.cache.max-size", TRINO_PROPERTIES_PREFIX + 
"hive.fs.cache.max-size")
-              .put(
-                  "hive.query-partition-filter-required",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.query-partition-filter-required")
-              .put(
-                  "hive.table-statistics-enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.table-statistics-enabled")
-              .put("hive.auto-purge", TRINO_PROPERTIES_PREFIX + 
"hive.auto-purge")
-              .put(
-                  "hive.partition-projection-enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.partition-projection-enabled")
-              .put(
-                  "hive.max-partition-drops-per-query",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.max-partition-drops-per-query")
-              .put(
-                  "hive.single-statement-writes",
-                  TRINO_PROPERTIES_PREFIX + "hive.single-statement-writes")
-
-              // Hive performance
-              .put(
-                  "hive.max-outstanding-splits",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-outstanding-splits")
-              .put(
-                  "hive.max-outstanding-splits-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-outstanding-splits-size")
-              .put(
-                  "hive.max-splits-per-second",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-splits-per-second")
-              .put("hive.max-initial-splits", TRINO_PROPERTIES_PREFIX + 
"hive.max-initial-splits")
-              .put(
-                  "hive.max-initial-split-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.max-initial-split-size")
-              .put("hive.max-split-size", TRINO_PROPERTIES_PREFIX + 
"hive.max-split-size")
-
-              // S3
-              .put("hive.s3.aws-access-key", TRINO_PROPERTIES_PREFIX + 
"hive.s3.aws-access-key")
-              .put("hive.s3.aws-secret-key", TRINO_PROPERTIES_PREFIX + 
"hive.s3.aws-secret-key")
-              .put("hive.s3.iam-role", TRINO_PROPERTIES_PREFIX + 
"hive.s3.iam-role")
-              .put("hive.s3.external-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.external-id")
-              .put("hive.s3.endpoint", TRINO_PROPERTIES_PREFIX + 
"hive.s3.endpoint")
-              .put("hive.s3.region", TRINO_PROPERTIES_PREFIX + 
"hive.s3.region")
-              .put("hive.s3.storage-class", TRINO_PROPERTIES_PREFIX + 
"hive.s3.storage-class")
-              .put("hive.s3.signer-type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.signer-type")
-              .put("hive.s3.signer-class", TRINO_PROPERTIES_PREFIX + 
"hive.s3.signer-class")
-              .put(
-                  "hive.s3.path-style-access",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.path-style-access")
-              .put(
-                  "hive.s3.staging-directory",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.staging-directory")
-              .put(
-                  "hive.s3.pin-client-to-current-region",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.pin-client-to-current-region")
-              .put("hive.s3.ssl.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.s3.ssl.enabled")
-              .put("hive.s3.sse.enabled", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.enabled")
-              .put("hive.s3.sse.type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.type")
-              .put("hive.s3.sse.kms-key-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sse.kms-key-id")
-              .put("hive.s3.kms-key-id", TRINO_PROPERTIES_PREFIX + 
"hive.s3.kms-key-id")
-              .put(
-                  "hive.s3.encryption-materials-provider",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.encryption-materials-provider")
-              .put("hive.s3.upload-acl-type", TRINO_PROPERTIES_PREFIX + 
"hive.s3.upload-acl-type")
-              .put(
-                  "hive.s3.skip-glacier-objects",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.skip-glacier-objects")
-              .put(
-                  "hive.s3.streaming.enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.streaming.enabled")
-              .put(
-                  "hive.s3.streaming.part-size",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.streaming.part-size")
-              .put("hive.s3.proxy.host", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.host")
-              .put("hive.s3.proxy.port", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.port")
-              .put("hive.s3.proxy.protocol", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.protocol")
-              .put(
-                  "hive.s3.proxy.non-proxy-hosts",
-                  TRINO_PROPERTIES_PREFIX + "hive.s3.proxy.non-proxy-hosts")
-              .put("hive.s3.proxy.username", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.username")
-              .put("hive.s3.proxy.password", TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.password")
-              .put(
-                  "hive.s3.proxy.preemptive-basic-auth",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.s3.proxy.preemptive-basic-auth")
-              .put("hive.s3.sts.endpoint", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sts.endpoint")
-              .put("hive.s3.sts.region", TRINO_PROPERTIES_PREFIX + 
"hive.s3.sts.region")
-
-              // Hive metastore Thrift service authentication
-              .put(
-                  "hive.metastore.authentication.type",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.metastore.authentication.type")
-              .put(
-                  "hive.metastore.thrift.impersonation.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.metastore.thrift.impersonation.enabled")
-              .put(
-                  "hive.metastore.service.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.service.principal")
-              .put(
-                  "hive.metastore.client.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.client.principal")
-              .put(
-                  "hive.metastore.client.keytab",
-                  TRINO_PROPERTIES_PREFIX + "hive.metastore.client.keytab")
-
-              // HDFS authentication
-              .put(
-                  "hive.hdfs.authentication.type",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.authentication.type")
-              .put(
-                  "hive.hdfs.impersonation.enabled",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.impersonation.enabled")
-              .put(
-                  "hive.hdfs.trino.principal",
-                  TRINO_PROPERTIES_PREFIX + "hive.hdfs.trino.principal")
-              .put("hive.hdfs.trino.keytab", TRINO_PROPERTIES_PREFIX + 
"hive.hdfs.trino.keytab")
-              .put(
-                  "hive.hdfs.wire-encryption.enabled",
-                  TRINO_PROPERTIES_PREFIX + 
"hive.hdfs.wire-encryption.enabled")
-              .build());
+public class IcebergCatalogPropertyConverter extends CatalogPropertyConverter {
 
   private static final Set<String> JDBC_BACKEND_REQUIRED_PROPERTIES =
       Set.of("jdbc-driver", "uri", "jdbc-user", "jdbc-password");
 
   private static final Set<String> HIVE_BACKEND_REQUIRED_PROPERTIES = 
Set.of("uri");
 
-  @Override
-  public TreeBidiMap<String, String> engineToGravitinoMapping() {
-    return TRINO_ICEBERG_TO_GRAVITINO_ICEBERG;
-  }
-
   @Override
   public Map<String, String> gravitinoToEngineProperties(Map<String, String> 
properties) {
     Map<String, String> stringStringMap;
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/JDBCCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/JDBCCatalogPropertyConverter.java
index 29696c530c..6d438a3e53 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/JDBCCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/JDBCCatalogPropertyConverter.java
@@ -24,14 +24,14 @@ import com.google.common.collect.Sets;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.collections4.bidimap.TreeBidiMap;
-import org.apache.gravitino.catalog.property.PropertyConverter;
+import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 
 /**
  * Property converter for JDBC catalog properties. Handles the conversion of 
property keys between
  * Trino and Gravitino formats for JDBC catalogs, including connection, 
authentication, and general
  * configuration properties.
  */
-public class JDBCCatalogPropertyConverter extends PropertyConverter {
+public class JDBCCatalogPropertyConverter extends CatalogPropertyConverter {
 
   /** Property key for JDBC connection URL. */
   public static final String JDBC_CONNECTION_URL_KEY = "connection-url";
@@ -51,67 +51,6 @@ public class JDBCCatalogPropertyConverter extends 
PropertyConverter {
               // Data source authentication
               .put(JDBC_CONNECTION_USER_KEY, "jdbc-user")
               .put(JDBC_CONNECTION_PASSWORD_KEY, "jdbc-password")
-              .put("credential-provider.type", TRINO_PROPERTIES_PREFIX + 
"credential-provider.type")
-              .put("user-credential-name", TRINO_PROPERTIES_PREFIX + 
"user-credential-name")
-              .put("password-credential-name", TRINO_PROPERTIES_PREFIX + 
"password-credential-name")
-              .put(
-                  "connection-credential-file",
-                  TRINO_PROPERTIES_PREFIX + "connection-credential-file")
-              .put("keystore-file-path", TRINO_PROPERTIES_PREFIX + 
"keystore-file-path")
-              .put("keystore-type", TRINO_PROPERTIES_PREFIX + "keystore-type")
-              .put("keystore-password", TRINO_PROPERTIES_PREFIX + 
"keystore-password")
-              .put(
-                  "keystore-user-credential-name",
-                  TRINO_PROPERTIES_PREFIX + "keystore-user-credential-name")
-              .put(
-                  "keystore-user-credential-password",
-                  TRINO_PROPERTIES_PREFIX + 
"keystore-user-credential-password")
-              .put(
-                  "keystore-password-credential-name",
-                  TRINO_PROPERTIES_PREFIX + 
"keystore-password-credential-name")
-              .put(
-                  "keystore-password-credential-password",
-                  TRINO_PROPERTIES_PREFIX + 
"keystore-password-credential-password")
-
-              // General configuration properties
-              .put(
-                  "case-insensitive-name-matching",
-                  TRINO_PROPERTIES_PREFIX + "ase-insensitive-name-matching")
-              .put(
-                  "case-insensitive-name-matching.cache-ttl",
-                  TRINO_PROPERTIES_PREFIX + 
"case-insensitive-name-matching.cache-ttl")
-              .put(
-                  "case-insensitive-name-matching.config-file",
-                  TRINO_PROPERTIES_PREFIX + 
"case-insensitive-name-matching.config-file")
-              .put(
-                  "case-insensitive-name-matching.config-file.refresh-period",
-                  TRINO_PROPERTIES_PREFIX
-                      + 
"case-insensitive-name-matching.config-file.refresh-period")
-              .put("metadata.cache-ttl", TRINO_PROPERTIES_PREFIX + 
"metadata.cache-ttl")
-              .put("metadata.cache-missing", TRINO_PROPERTIES_PREFIX + 
"metadata.cache-missing")
-              .put(
-                  "metadata.schemas.cache-ttl",
-                  TRINO_PROPERTIES_PREFIX + "metadata.schemas.cache-ttl")
-              .put(
-                  "metadata.tables.cache-ttl",
-                  TRINO_PROPERTIES_PREFIX + "metadata.tables.cache-ttl")
-              .put(
-                  "metadata.statistics.cache-ttl",
-                  TRINO_PROPERTIES_PREFIX + "metadata.statistics.cache-ttl")
-              .put(
-                  "metadata.cache-maximum-size",
-                  TRINO_PROPERTIES_PREFIX + "metadata.cache-maximum-size")
-              .put("write.batch-size", TRINO_PROPERTIES_PREFIX + 
"write.batch-size")
-              .put(
-                  "dynamic-filtering.enabled",
-                  TRINO_PROPERTIES_PREFIX + "dynamic-filtering.enabled")
-              .put(
-                  "dynamic-filtering.wait-timeout",
-                  TRINO_PROPERTIES_PREFIX + "dynamic-filtering.wait-timeout")
-
-              // Performance
-              .put("join-pushdown.enabled", TRINO_PROPERTIES_PREFIX + 
"join-pushdown.enabled")
-              .put("join-pushdown.strategy", TRINO_PROPERTIES_PREFIX + 
"join-pushdown.strategy")
               .build());
 
   /** Set of required properties for JDBC connection. */
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/memory/MemoryConnectorAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/memory/MemoryConnectorAdapter.java
index 33acda7ba8..9da5601a5f 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/memory/MemoryConnectorAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/memory/MemoryConnectorAdapter.java
@@ -22,8 +22,10 @@ import io.trino.spi.session.PropertyMetadata;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.catalog.property.PropertyConverter;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 
@@ -34,6 +36,7 @@ import 
org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 public class MemoryConnectorAdapter implements CatalogConnectorAdapter {
 
   private static final String CONNECTOR_MEMORY = "memory";
+  private final PropertyConverter catalogConverter;
   private final HasPropertyMeta propertyMetadata;
 
   /**
@@ -41,12 +44,13 @@ public class MemoryConnectorAdapter implements 
CatalogConnectorAdapter {
    * memory-specific configurations.
    */
   public MemoryConnectorAdapter() {
+    this.catalogConverter = new CatalogPropertyConverter();
     this.propertyMetadata = new MemoryPropertyMeta();
   }
 
   @Override
   public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog 
catalog) {
-    return Collections.emptyMap();
+    return 
catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
   }
 
   @Override
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
index 05ff602c18..172d48962c 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import io.trino.plugin.memory.MemoryConnector;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ConnectorMetadata;
@@ -81,7 +82,7 @@ public class GravitinoMockServer implements AutoCloseable {
 
   public GravitinoMockServer() {
     createMetalake(testMetalake);
-    createCatalog(testMetalake, testCatalog);
+    createCatalog(testMetalake, testCatalog, ImmutableMap.of());
   }
 
   public void setCatalogConnectorManager(CatalogConnectorManager 
catalogConnectorManager) {
@@ -157,8 +158,9 @@ public class GravitinoMockServer implements AutoCloseable {
               @Override
               public Catalog answer(InvocationOnMock invocation) throws 
Throwable {
                 String catalogName = invocation.getArgument(0);
+                Map<String, String> properties = invocation.getArgument(4);
 
-                Catalog catalog = createCatalog(metalakeName, catalogName);
+                Catalog catalog = createCatalog(metalakeName, catalogName, 
properties);
 
                 return catalog;
               }
@@ -214,12 +216,14 @@ public class GravitinoMockServer implements AutoCloseable 
{
     return metaLake;
   }
 
-  private Catalog createCatalog(String metalakeName, String catalogName) {
+  private Catalog createCatalog(
+      String metalakeName, String catalogName, Map<String, String> properties) 
{
     Catalog catalog = mock(Catalog.class);
     when(catalog.name()).thenReturn(catalogName);
     when(catalog.provider()).thenReturn(testCatalogProvider);
     when(catalog.type()).thenReturn(Catalog.Type.RELATIONAL);
-    when(catalog.properties()).thenReturn(Map.of("max_ttl", "10"));
+    when(catalog.properties())
+        .thenReturn(properties.isEmpty() ? Map.of("max_ttl", "10") : 
properties);
 
     Audit mockAudit = mock(Audit.class);
     when(mockAudit.creator()).thenReturn("gravitino");
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
index 2a92d13e5e..e10f6be68f 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
@@ -240,6 +240,20 @@ public class TestGravitinoConnector extends 
AbstractTestQueryFramework {
     assertUpdate("call gravitino.system.drop_catalog('memory1')");
     assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).doesNotContain("memory1");
 
+    // test create catalog with config by trino.bypass.
+    assertUpdate(
+        "call gravitino.system.create_catalog('memory1', 'memory', 
Map(array['trino.bypass.memory.max-data-per-node'], array['128MB']))");
+    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("memory1");
+    assertUpdate("call gravitino.system.drop_catalog('memory1')");
+    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).doesNotContain("memory1");
+
+    // test create catalog with invalid config by trino.bypass.
+    assertQueryFails(
+        "call gravitino.system.create_catalog("
+            + "catalog=>'memory1', provider=>'memory', properties => 
Map(array['trino.bypass.unknown-direct-key'], array['10']))",
+        format("Create catalog failed. Create catalog failed due to the 
loading process fails"));
+    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).doesNotContain("memory1");
+
     assertUpdate(
         "call gravitino.system.create_catalog("
             + "catalog=>'memory1', provider=>'memory', properties => 
Map(array['max_ttl'], array['10']), ignore_exist => true)");
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveCatalogPropertyConverter.java
index efccbf6764..e92f0705e7 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveCatalogPropertyConverter.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.trino.connector.catalog.hive;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.trino.connector.catalog.CatalogPropertyConverter;
 import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 import org.apache.gravitino.trino.connector.metadata.TestGravitinoCatalog;
 import org.junit.jupiter.api.Assertions;
@@ -32,7 +33,7 @@ public class TestHiveCatalogPropertyConverter {
   @Test
   public void testConverter() {
     // You can refer to testHiveCatalogCreatedByGravitino
-    HiveCatalogPropertyConverter hiveCatalogPropertyConverter = new 
HiveCatalogPropertyConverter();
+    CatalogPropertyConverter hiveCatalogPropertyConverter = new 
CatalogPropertyConverter();
     Map<String, String> map =
         ImmutableMap.<String, String>builder()
             .put("trino.bypass.hive.immutable-partitions", "true")
@@ -43,7 +44,7 @@ public class TestHiveCatalogPropertyConverter {
     Map<String, String> re = 
hiveCatalogPropertyConverter.gravitinoToEngineProperties(map);
     Assertions.assertEquals(re.get("hive.immutable-partitions"), "true");
     Assertions.assertEquals(re.get("hive.compression-codec"), "ZSTD");
-    Assertions.assertNull(re.get("hive.unknown-key"));
+    Assertions.assertEquals(re.get("hive.unknown-key"), "1");
   }
 
   @Test
@@ -53,8 +54,8 @@ public class TestHiveCatalogPropertyConverter {
     Map<String, String> properties =
         ImmutableMap.<String, String>builder()
             .put("metastore.uris", "thrift://localhost:9083")
-            .put("hive.unknown-key", "1")
-            .put("trino.bypass.unknown-key", "1")
+            .put("unknown-key", "1")
+            .put("trino.bypass.hive.unknown-key", "1")
             .put("trino.bypass.hive.config.resources", "/tmp/hive-site.xml, 
/tmp/core-site.xml")
             .build();
     Catalog mockCatalog =
@@ -75,7 +76,7 @@ public class TestHiveCatalogPropertyConverter {
         config.get("hive.config.resources"), "/tmp/hive-site.xml, 
/tmp/core-site.xml");
 
     // test unknown properties
-    Assertions.assertNull(config.get("hive.unknown-key"));
-    Assertions.assertNull(config.get("trino.bypass.unknown-key"));
+    Assertions.assertNull(config.get("unknown-key"));
+    Assertions.assertEquals(config.get("hive.unknown-key"), "1");
   }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestIcebergCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestIcebergCatalogPropertyConverter.java
index d5862ba599..abc3a3e4b4 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestIcebergCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestIcebergCatalogPropertyConverter.java
@@ -102,7 +102,7 @@ public class TestIcebergCatalogPropertyConverter {
             .put("catalog-backend", "hive")
             .put("warehouse", "hdfs://tmp/warehouse")
             .put("unknown-key", "1")
-            .put("trino.bypass.unknown-key", "1")
+            .put("trino.bypass.iceberg.unknown-key", "1")
             .put("trino.bypass.iceberg.table-statistics-enabled", "true")
             .build();
     Catalog mockCatalog =
@@ -121,8 +121,8 @@ public class TestIcebergCatalogPropertyConverter {
     Assertions.assertEquals(config.get("iceberg.table-statistics-enabled"), 
"true");
 
     // test unknown properties
-    Assertions.assertNull(config.get("hive.unknown-key"));
-    Assertions.assertNull(config.get("trino.bypass.unknown-key"));
+    Assertions.assertNull(config.get("unknown-key"));
+    Assertions.assertEquals(config.get("iceberg.unknown-key"), "1");
   }
 
   @Test
@@ -138,7 +138,7 @@ public class TestIcebergCatalogPropertyConverter {
             .put("jdbc-password", "ds123")
             .put("jdbc-driver", "com.mysql.cj.jdbc.Driver")
             .put("unknown-key", "1")
-            .put("trino.bypass.unknown-key", "1")
+            .put("trino.bypass.iceberg.unknown-key", "1")
             .put("trino.bypass.iceberg.table-statistics-enabled", "true")
             .build();
     Catalog mockCatalog =
@@ -163,7 +163,7 @@ public class TestIcebergCatalogPropertyConverter {
     Assertions.assertEquals(config.get("iceberg.table-statistics-enabled"), 
"true");
 
     // test unknown properties
-    Assertions.assertNull(config.get("hive.unknown-key"));
-    Assertions.assertNull(config.get("trino.bypass.unknown-key"));
+    Assertions.assertNull(config.get("unknown-key"));
+    Assertions.assertEquals(config.get("iceberg.unknown-key"), "1");
   }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/TestMySQLCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/TestMySQLCatalogPropertyConverter.java
index 6d108d55af..281a1266b6 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/TestMySQLCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/TestMySQLCatalogPropertyConverter.java
@@ -39,7 +39,7 @@ public class TestMySQLCatalogPropertyConverter {
             .put("jdbc-password", "test")
             .put("trino.bypass.join-pushdown.strategy", "EAGER")
             .put("unknown-key", "1")
-            .put("trino.bypass.unknown-key", "1")
+            .put("trino.bypass.mysql.unknown-key", "1")
             .build();
     Catalog mockCatalog =
         TestGravitinoCatalog.mockCatalog(
@@ -58,7 +58,7 @@ public class TestMySQLCatalogPropertyConverter {
     Assertions.assertEquals(config.get("join-pushdown.strategy"), "EAGER");
 
     // test unknown properties
-    Assertions.assertNull(config.get("hive.unknown-key"));
-    Assertions.assertNull(config.get("trino.bypass.unknown-key"));
+    Assertions.assertNull(config.get("unknown-key"));
+    Assertions.assertEquals(config.get("mysql.unknown-key"), "1");
   }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLCatalogPropertyConverter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLCatalogPropertyConverter.java
index c37aa223f2..fa7bd2b6f7 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLCatalogPropertyConverter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLCatalogPropertyConverter.java
@@ -39,7 +39,7 @@ public class TestPostgreSQLCatalogPropertyConverter {
             .put("jdbc-password", "test")
             .put("trino.bypass.join-pushdown.strategy", "EAGER")
             .put("unknown-key", "1")
-            .put("trino.bypass.unknown-key", "1")
+            .put("trino.bypass.postgresql.unknown-key", "1")
             .build();
     Catalog mockCatalog =
         TestGravitinoCatalog.mockCatalog(
@@ -58,7 +58,7 @@ public class TestPostgreSQLCatalogPropertyConverter {
     Assertions.assertEquals(config.get("join-pushdown.strategy"), "EAGER");
 
     // test unknown properties
-    Assertions.assertNull(config.get("hive.unknown-key"));
-    Assertions.assertNull(config.get("trino.bypass.unknown-key"));
+    Assertions.assertNull(config.get("unknown-key"));
+    Assertions.assertEquals(config.get("postgresql.unknown-key"), "1");
   }
 }

Reply via email to