This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c2927102e [#6854] improvement(hudi): Hudi catalog support Kerberos 
authentication (#6923)
6c2927102e is described below

commit 6c2927102e9edb473a8918a15c8241d3b0e30cfd
Author: Cyber Star <[email protected]>
AuthorDate: Wed May 7 10:51:06 2025 +0800

    [#6854] improvement(hudi): Hudi catalog support Kerberos authentication 
(#6923)
    
    ### What changes were proposed in this pull request?
    
    When the Hudi catalog back-end is hms, the hms Kerberos authentication
    function is supported
    
    ### Why are the changes needed?
    
    Fix: #6854
    
    1. For data security, it is necessary to support hms Kerberos
    authentication;
    2. Provides a HMS Kerberos authentication methods: HudiHMSBackendOps.
    InitKerberosAuth (...). Access entry
    
    
    ### Does this PR introduce _any_ user-facing change?
    1. Introduced user-facing changes:
    The hudi catalog has added property parameters:
    authentication.type: The type of authentication for hudi catalog
    backend. This configuration only applicable for for hms backend, and
    only supports `Kerberos`, `simple` currently.
    authentication.kerberos. principal: The principal of the Kerberos
    authentication
    Authentication.kerbers.keytab-uri: The URI of The keytab for the
    Kerberos authentication.
    Property name with this prefix passed down to the underlying backend
    client for use. Such as ` gravitino. Bypass. Hive. Metastore. Kerberos.
    The principal = XXXX ` ` gravitino. Bypass. Hadoop. Security. The
    authentication = kerberos `, ` gr avitino.bypass.hive.metastore.sasl.
    enabled=ture` And so on.
    
    ### How was this patch tested?
    
    Using the web ui, you can configure hudi catalog backend hms and config
    Kerberos-related parameters to manage hudi metadata. As shown below:
    <img width="1379" alt="image"
    
src="https://github.com/user-attachments/assets/6f89d707-0e82-4091-9076-00761508d158";
    />
---
 catalogs/catalog-lakehouse-hudi/build.gradle.kts   |   1 +
 .../lakehouse/hudi/HudiCatalogOperations.java      |  11 +-
 .../hudi/backend/hms/HudiHMSBackendOps.java        |  37 ++-
 .../backend/hms/kerberos/AuthenticationConfig.java |  68 ++++++
 .../hudi/backend/hms/kerberos/FetchFileUtils.java  |  63 +++++
 .../hudi/backend/hms/kerberos/KerberosClient.java  | 121 ++++++++++
 .../hudi/backend/hms/kerberos/KerberosConfig.java  |  90 +++++++
 .../catalog/lakehouse/hudi/utils/CatalogUtils.java |   3 +
 .../test/HudiCatalogKerberosHiveIT.java            | 260 +++++++++++++++++++++
 .../apache/gravitino/config/ConfigConstants.java   |   3 +
 docs/lakehouse-hudi-catalog.md                     |  16 ++
 11 files changed, 667 insertions(+), 6 deletions(-)

diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts 
b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
index 814965ec03..8d62d77fac 100644
--- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
 
   implementation(libs.commons.collections3)
   implementation(libs.commons.configuration1)
+  implementation(libs.commons.io)
   implementation(libs.htrace.core4)
   implementation(libs.guava)
   implementation(libs.hadoop2.auth) {
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
index f73927233a..f471dbbdce 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
@@ -19,7 +19,9 @@
 package org.apache.gravitino.catalog.lakehouse.hudi;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -69,7 +71,14 @@ public class HudiCatalogOperations implements 
CatalogOperations, SupportsSchemas
   public void initialize(
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
-    HudiCatalogBackend hudiCatalogBackend = 
CatalogUtils.loadHudiCatalogBackend(config);
+    HudiCatalogBackend hudiCatalogBackend =
+        CatalogUtils.loadHudiCatalogBackend(
+            ImmutableMap.<String, String>builder()
+                .putAll(config)
+                .put(
+                    CatalogUtils.CATALOG_ID_KEY,
+                    (Objects.nonNull(info) ? String.valueOf(info.id()) : "0"))
+                .build());
     hudiCatalogBackendOps = hudiCatalogBackend.backendOps();
   }
 
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
index 9b03518a60..37a40a8314 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
@@ -24,6 +24,8 @@ import static 
org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
@@ -31,7 +33,10 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
 import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos.AuthenticationConfig;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos.KerberosClient;
 import org.apache.gravitino.catalog.lakehouse.hudi.ops.HudiCatalogBackendOps;
+import org.apache.gravitino.catalog.lakehouse.hudi.utils.CatalogUtils;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -52,9 +57,11 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HudiHMSBackendOps implements HudiCatalogBackendOps {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(HudiHMSBackendOps.class);
   // Mapping from Gravitino config to Hive config
   private static final Map<String, String> CONFIG_CONVERTER =
       ImmutableMap.of(URI, HiveConf.ConfVars.METASTOREURIS.varname);
@@ -63,9 +70,12 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
 
   @VisibleForTesting CachedClientPool clientPool;
 
+  public static final String GRAVITINO_KEYTAB_FORMAT = 
"keytabs/gravitino-lakehouse-hudi-%s-keytab";
+
   @Override
   public void initialize(Map<String, String> properties) {
-    this.clientPool = new CachedClientPool(buildHiveConf(properties), 
properties);
+    HiveConf hiveConf = buildHiveConfAndInitKerberosAuth(properties);
+    this.clientPool = new CachedClientPool(hiveConf, properties);
   }
 
   @Override
@@ -225,7 +235,7 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
         && table.getSd().getInputFormat().startsWith(HUDI_PACKAGE_PREFIX);
   }
 
-  private HiveConf buildHiveConf(Map<String, String> properties) {
+  private HiveConf buildHiveConfAndInitKerberosAuth(Map<String, String> 
properties) {
     Configuration hadoopConf = new Configuration();
 
     Map<String, String> byPassConfigs = Maps.newHashMap();
@@ -236,12 +246,29 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
             byPassConfigs.put(key.substring(CATALOG_BYPASS_PREFIX.length()), 
value);
           } else if (CONFIG_CONVERTER.containsKey(key)) {
             convertedConfigs.put(CONFIG_CONVERTER.get(key), value);
+          } else {
+            hadoopConf.set(key, value);
           }
         });
     byPassConfigs.forEach(hadoopConf::set);
-    // Gravitino conf has higher priority than bypass conf
     convertedConfigs.forEach(hadoopConf::set);
-
+    initKerberosAuth(properties, hadoopConf);
     return new HiveConf(hadoopConf, HudiHMSBackendOps.class);
   }
+
+  private void initKerberosAuth(Map<String, String> properties, Configuration 
hadoopConf) {
+    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties);
+    if (authenticationConfig.isKerberosAuth()) {
+      try (KerberosClient kerberosClient = new KerberosClient(properties, 
hadoopConf, true)) {
+        String keytabPath =
+            String.format(
+                GRAVITINO_KEYTAB_FORMAT, 
properties.getOrDefault(CatalogUtils.CATALOG_ID_KEY, "0"));
+        File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath);
+        kerberosClient.login(keytabFile.getAbsolutePath());
+        LOG.info("Login with kerberos success");
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to login with kerberos", e);
+      }
+    }
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/AuthenticationConfig.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/AuthenticationConfig.java
new file mode 100644
index 0000000000..c1ff8b8f40
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/AuthenticationConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos;
+
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple
+  public static final String AUTH_TYPE_KEY = "authentication.type";
+
+  public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
+
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS;
+  }
+
+  public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
+
+  public AuthenticationConfig(java.util.Map<String, String> properties) {
+    super(false);
+    loadFromMap(properties, k -> true);
+  }
+
+  public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
+      new ConfigBuilder(AUTH_TYPE_KEY)
+          .doc(
+              "The type of authentication for Hudi catalog, currently we only 
support simple and Kerberos")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .stringConf()
+          .createWithDefault("simple");
+
+  public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY =
+      new ConfigBuilder(IMPERSONATION_ENABLE_KEY)
+          .doc("Whether to enable impersonation for the Hudi catalog")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .booleanConf()
+          .createWithDefault(KERBEROS_DEFAULT_IMPERSONATION_ENABLE);
+
+  public String getAuthType() {
+    return get(AUTH_TYPE_ENTRY);
+  }
+
+  public boolean isKerberosAuth() {
+    return 
AuthenticationConfig.AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType());
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/FetchFileUtils.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/FetchFileUtils.java
new file mode 100644
index 0000000000..c94a87bdf2
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/FetchFileUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          break;
+
+        case "hdfs":
+          FileSystem.get(conf).copyToLocalFile(new Path(uri), new 
Path(destFile.toURI()));
+          break;
+
+        default:
+          throw new IllegalArgumentException(
+              String.format("Doesn't support the scheme %s", scheme));
+      }
+    } catch (URISyntaxException ue) {
+      throw new IllegalArgumentException("The uri of file has the wrong 
format", ue);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosClient.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosClient.java
new file mode 100644
index 0000000000..9bdbbc7424
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final java.util.Map<String, String> conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+
+  public KerberosClient(
+      java.util.Map<String, String> conf, Configuration hadoopConf, boolean 
refreshCredentials) {
+    this.conf = conf;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+  }
+
+  public String login(String keytabFilePath) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    @SuppressWarnings("null")
+    java.util.List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation kerberosLoginUgi = 
UserGroupInformation.getLoginUser();
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              kerberosLoginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return principalComponents.get(1);
+  }
+
+  public File saveKeyTabFileFromUri(String keytabPath) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(keytabPath);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }
+
+  private static ThreadFactory getThreadFactory(String factoryName) {
+    return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
+  }
+
+  @Override
+  public void close() {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();
+    }
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosConfig.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosConfig.java
new file mode 100644
index 0000000000..333c9df099
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/kerberos/KerberosConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos;
+
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+
+public class KerberosConfig extends AuthenticationConfig {
+  public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";
+
+  public static final String PRINCIPAL_KEY = 
"authentication.kerberos.principal";
+
+  public static final String CHECK_INTERVAL_SEC_KEY = 
"authentication.kerberos.check-interval-sec";
+
+  public static final String FETCH_TIMEOUT_SEC_KEY =
+      "authentication.kerberos.keytab-fetch-timeout-sec";
+
+  public static final ConfigEntry<String> PRINCIPAL_ENTRY =
+      new ConfigBuilder(PRINCIPAL_KEY)
+          .doc("The principal of the Kerberos connection")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> KEYTAB_ENTRY =
+      new ConfigBuilder(KEY_TAB_URI_KEY)
+          .doc("The keytab of the Kerberos connection")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY =
+      new ConfigBuilder(CHECK_INTERVAL_SEC_KEY)
+          .doc("The check interval of the Kerberos connection for Hudi 
catalog")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public static final ConfigEntry<Integer> FETCH_TIMEOUT_SEC_ENTRY =
+      new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY)
+          .doc("The fetch timeout of the Kerberos connection")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public KerberosConfig(Map<String, String> properties) {
+    super(properties);
+    loadFromMap(properties, k -> true);
+  }
+
+  public String getPrincipalName() {
+    return get(PRINCIPAL_ENTRY);
+  }
+
+  public String getKeytab() {
+    return get(KEYTAB_ENTRY);
+  }
+
+  public int getCheckIntervalSec() {
+    return get(CHECK_INTERVAL_SEC_ENTRY);
+  }
+
+  public int getFetchTimeoutSec() {
+    return get(FETCH_TIMEOUT_SEC_ENTRY);
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
index c629ee55a8..7e6a45081f 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
@@ -27,6 +27,9 @@ import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackend;
 
 public class CatalogUtils {
+
+  public static final String CATALOG_ID_KEY = "catalogId";
+
   private CatalogUtils() {}
 
   public static HudiCatalogBackend loadHudiCatalogBackend(Map<String, String> 
properties) {
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
new file mode 100644
index 0000000000..9be5a02bee
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi.integration.test;
+
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos.AuthenticationConfig;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.kerberos.KerberosConfig;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.KerberosTokenProvider;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class HudiCatalogKerberosHiveIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HudiCatalogKerberosHiveIT.class);
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+
+  private static final String SDK_KERBEROS_PRINCIPAL_KEY = 
"client.kerberos.principal";
+  private static final String SDK_KERBEROS_KEYTAB_KEY = 
"client.kerberos.keytab";
+
+  private static final String GRAVITINO_CLIENT_PRINCIPAL = 
"gravitino_client@HADOOPKRB";
+  private static final String GRAVITINO_CLIENT_KEYTAB = 
"/gravitino_client.keytab";
+
+  private static final String GRAVITINO_SERVER_PRINCIPAL = 
"HTTP/localhost@HADOOPKRB";
+  private static final String GRAVITINO_SERVER_KEYTAB = 
"/gravitino_server.keytab";
+
+  private static final String HIVE_METASTORE_CLIENT_PRINCIPAL = 
"cli@HADOOPKRB";
+  private static final String HIVE_METASTORE_CLIENT_KEYTAB = "/client.keytab";
+
+  private static String TMP_DIR;
+
+  protected static String HIVE_METASTORE_URI;
+
+  private static GravitinoAdminClient adminClient;
+
+  protected static HiveContainer kerberosHiveContainer;
+
+  static String METALAKE_NAME = 
GravitinoITUtils.genRandomName("test_metalake");
+  static String CATALOG_NAME = GravitinoITUtils.genRandomName("test_catalog");
+  static String SCHEMA_NAME = "default";
+  static String TABLE_NAME = GravitinoITUtils.genRandomName("test_table");
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startKerberosHiveContainer();
+    kerberosHiveContainer = containerSuite.getKerberosHiveContainer();
+
+    File baseDir = new File(System.getProperty("java.io.tmpdir"));
+    File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile();
+    file.deleteOnExit();
+    TMP_DIR = file.getAbsolutePath();
+
+    HIVE_METASTORE_URI =
+        String.format(
+            "thrift://%s:%d",
+            kerberosHiveContainer.getContainerIpAddress(), 
HiveContainer.HIVE_METASTORE_PORT);
+
+    // Prepare kerberos related-config;
+    prepareKerberosConfig();
+
+    // Config kerberos configuration for Gravitino server
+    addKerberosConfig();
+
+    // Create hive tables
+    createHudiTables();
+
+    // Start Gravitino server
+    super.startIntegrationTest();
+  }
+
+  @AfterAll
+  public void stop() {
+    // Reset the UGI
+    org.apache.hadoop.security.UserGroupInformation.reset();
+
+    LOG.info("krb5 path: {}", System.getProperty("java.security.krb5.conf"));
+    // Clean up the kerberos configuration
+    System.clearProperty("java.security.krb5.conf");
+    System.clearProperty("sun.security.krb5.debug");
+
+    client = null;
+  }
+
+  private static void prepareKerberosConfig() throws Exception {
+    // Keytab of the Gravitino SDK client
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileFromContainer("/gravitino_client.keytab", TMP_DIR + 
GRAVITINO_CLIENT_KEYTAB);
+
+    // Keytab of the Gravitino server
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileFromContainer("/gravitino_server.keytab", TMP_DIR + 
GRAVITINO_SERVER_KEYTAB);
+
+    // Keytab of Gravitino server to connector to Hive
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileFromContainer("/etc/admin.keytab", TMP_DIR + 
HIVE_METASTORE_CLIENT_KEYTAB);
+
+    String tmpKrb5Path = TMP_DIR + "/krb5.conf_tmp";
+    String krb5Path = TMP_DIR + "/krb5.conf";
+    
kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", 
tmpKrb5Path);
+
+    // Modify the krb5.conf and change the kdc and admin_server to the 
container IP
+    String ip = 
containerSuite.getKerberosHiveContainer().getContainerIpAddress();
+    String content =
+        FileUtils.readFileToString(new File(tmpKrb5Path), 
java.nio.charset.StandardCharsets.UTF_8);
+    content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88");
+    content = content.replace("admin_server = localhost", "admin_server = " + 
ip + ":749");
+    FileUtils.write(new File(krb5Path), content, 
java.nio.charset.StandardCharsets.UTF_8);
+
+    LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path);
+    System.setProperty("java.security.krb5.conf", krb5Path);
+    System.setProperty("sun.security.krb5.debug", "true");
+
+    refreshKerberosConfig();
+    KerberosName.resetDefaultRealm();
+
+    LOG.info("Kerberos default realm: {}", KerberosUtil.getDefaultRealm());
+  }
+
+  private static void refreshKerberosConfig() {
+    Class<?> classRef;
+    try {
+      if (System.getProperty("java.vendor").contains("IBM")) {
+        classRef = Class.forName("com.ibm.security.krb5.internal.Config");
+      } else {
+        classRef = Class.forName("sun.security.krb5.Config");
+      }
+
+      java.lang.reflect.Method refershMethod = classRef.getMethod("refresh");
+      refershMethod.invoke(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void addKerberosConfig() {
+    customConfigs.put(Configs.AUTHENTICATORS.getKey(), "kerberos");
+    customConfigs.put("gravitino.authenticator.kerberos.principal", 
GRAVITINO_SERVER_PRINCIPAL);
+    customConfigs.put("gravitino.authenticator.kerberos.keytab", TMP_DIR + 
GRAVITINO_SERVER_KEYTAB);
+    customConfigs.put(SDK_KERBEROS_KEYTAB_KEY, TMP_DIR + 
GRAVITINO_CLIENT_KEYTAB);
+    customConfigs.put(SDK_KERBEROS_PRINCIPAL_KEY, GRAVITINO_CLIENT_PRINCIPAL);
+  }
+
+  @Test
+  public void testHudiCatalogWithKerberos() {
+    KerberosTokenProvider provider =
+        KerberosTokenProvider.builder()
+            .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL)
+            .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB))
+            .build();
+    adminClient = 
GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build();
+    GravitinoMetalake gravitinoMetalake =
+        adminClient.createMetalake(METALAKE_NAME, null, ImmutableMap.of());
+
+    // Create a catalog with kerberos
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AuthenticationConfig.IMPERSONATION_ENABLE_KEY, "true");
+    properties.put(AuthenticationConfig.AUTH_TYPE_KEY, "kerberos");
+    properties.put(KerberosConfig.KEY_TAB_URI_KEY, TMP_DIR + 
HIVE_METASTORE_CLIENT_KEYTAB);
+    properties.put(KerberosConfig.PRINCIPAL_KEY, 
HIVE_METASTORE_CLIENT_PRINCIPAL);
+    properties.put("list-all-tables", "false");
+    properties.put(
+        CATALOG_BYPASS_PREFIX + "hive.metastore.kerberos.principal",
+        "hive/_HOST@HADOOPKRB".replace("_HOST", 
kerberosHiveContainer.getHostName()));
+    properties.put(CATALOG_BYPASS_PREFIX + "hive.metastore.sasl.enabled", 
"true");
+    properties.put(CATALOG_BYPASS_PREFIX + "hadoop.security.authentication", 
"kerberos");
+
+    properties.put(HudiCatalogPropertiesMetadata.CATALOG_BACKEND, "hms");
+    properties.put(HudiCatalogPropertiesMetadata.URI, HIVE_METASTORE_URI);
+    properties.put(
+        "warehouse",
+        String.format(
+            "hdfs://%s:%d/user/hive/warehouse-catalog-hudi",
+            kerberosHiveContainer.getContainerIpAddress(), 
HiveContainer.HDFS_DEFAULTFS_PORT));
+    Catalog catalog =
+        gravitinoMetalake.createCatalog(
+            CATALOG_NAME, Catalog.Type.RELATIONAL, "lakehouse-hudi", 
"comment", properties);
+    LOG.info("create catalog: {} sucess", catalog.name());
+    Assertions.assertEquals(CATALOG_NAME, catalog.name());
+
+    Schema schema = catalog.asSchemas().loadSchema(SCHEMA_NAME);
+    Assertions.assertEquals(SCHEMA_NAME, schema.name());
+
+    TableCatalog tableOps = catalog.asTableCatalog();
+    NoSuchTableException exception =
+        Assertions.assertThrows(
+            NoSuchTableException.class,
+            () -> tableOps.loadTable(NameIdentifier.of(SCHEMA_NAME, 
TABLE_NAME)));
+
+    Assertions.assertTrue(
+        exception
+            .getMessage()
+            .contains("Hudi table does not exist: " + TABLE_NAME + " in Hive 
Metastore"),
+        "Unexpected exception message: " + exception.getMessage());
+  }
+
+  private static void createHudiTables() {
+    String createTableSql =
+        String.format(
+            "CREATE TABLE %s.%s (\n"
+                + "  id STRING,\n"
+                + "  name STRING,\n"
+                + "  age INT\n"
+                + ") \n"
+                + "LOCATION 
'hdfs://localhost:9000/user/hive/warehouse-catalog-hudi' ",
+            SCHEMA_NAME, TABLE_NAME);
+    kerberosHiveContainer.executeInContainer("hive", "-e", "'" + 
createTableSql + ";'");
+    LOG.info("create table {} success[{}]", TABLE_NAME, createTableSql);
+
+    String insertTableSql =
+        String.format("INSERT INTO %s.%s" + " VALUES " + "(1,'Cyber',26)", 
SCHEMA_NAME, TABLE_NAME);
+    kerberosHiveContainer.executeInContainer("hive", "-e", "'" + 
insertTableSql + ";'");
+    LOG.info("insert table {} data success[{}]", TABLE_NAME, insertTableSql);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java 
b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index 633fd90928..af1087c519 100644
--- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -74,6 +74,9 @@ public final class ConfigConstants {
   /** The version number for the 0.9.0 release. */
   public static final String VERSION_0_9_0 = "0.9.0";
 
+  /** The version number for the 0.10.0 release. */
+  public static final String VERSION_0_10_0 = "0.10.0";
+
   /** The current version of backend storage initialization script. */
   public static final String CURRENT_SCRIPT_VERSION = VERSION_0_9_0;
 }
diff --git a/docs/lakehouse-hudi-catalog.md b/docs/lakehouse-hudi-catalog.md
index be6d328bfb..b52ffb40ec 100644
--- a/docs/lakehouse-hudi-catalog.md
+++ b/docs/lakehouse-hudi-catalog.md
@@ -39,6 +39,22 @@ Tested and verified with Apache Hudi `0.15.0`.
 | `client.pool-cache.eviction-interval-ms` | For HMS backend. The cache pool 
eviction interval.                                                              
                                                                                
                                       | 300000        | No       | 
0.7.0-incubating |
 | `gravitino.bypass.`                      | Property name with this prefix 
passed down to the underlying backend client for use. Such as 
`gravitino.bypass.hive.metastore.failure.retries = 3` indicate 3 times of 
retries upon failure of Thrift metastore calls for HMS backend. | (none)        
| No       | 0.7.0-incubating |
 
+#### Catalog backend security
+
+Users can use the following properties to configure the security of the 
catalog backend if needed. For example, if you are using a Kerberos Hive 
catalog backend, you must set `authentication.type` to `Kerberos` and provide 
`authentication.kerberos.principal` and `authentication.kerberos.keytab-uri`.
+
+| Property name                                      | Description             
                                                                                
                                                       | Default value | 
Required                                                    | Since Version     
|
+|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|-------------------|
+| `authentication.type`                              | The type of 
authentication for hudi catalog backend. This configuration only applicable for 
for hms backend, and only supports `kerberos`, `simple` currently. | `simple`   
   | No                                                          | 
0.10.0-incubating |
+| `authentication.impersonation-enable`              | Whether to enable 
impersonation for the hudi catalog                                              
                                                             | `false`       | 
No                                                          | 0.10.0-incubating 
|
+| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication                                                         
                                                          | (none)        | 
required if the value of `authentication.type` is kerberos. | 0.10.0-incubating 
|
+| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                                
                                                         | (none)        | 
required if the value of `authentication.type` is kerberos. | 0.10.0-incubating 
|
+| `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for hudi catalog.                                           
                                                         | 60            | No   
                                                       | 0.10.0-incubating |
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.           
                                                          | 60            | No  
                                                        | 0.10.0-incubating |
+
+Property name with this prefix passed down to the underlying backend client 
for use. Such as 
`gravitino.bypass.hive.metastore.kerberos.principal=XXXX`、`gravitino.bypass.hadoop.security.authentication=kerberos`、`gravitino.bypass.hive.metastore.sasl.enabled=ture`
 And so on.
+
+
 ### Catalog operations
 
 Please refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) 
for more details.


Reply via email to