This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d7a58088f [#3462] improvement(hadoop-catalog): Support different 
level(catalog, schema,fileset) of authentication for hadoop catalog. (#3852)
d7a58088f is described below

commit d7a58088f480dc739bde8cc92d5dd928e557da06
Author: Qi Yu <[email protected]>
AuthorDate: Tue Jul 23 14:09:48 2024 +0800

    [#3462] improvement(hadoop-catalog): Support different level(catalog, 
schema,fileset) of authentication for hadoop catalog. (#3852)
    
    ### What changes were proposed in this pull request?
    
    Support set authentication for schema and fileset level for Hadoop
    catalog.
    
    ### Why are the changes needed?
    
    
    The Hadoo catalog may require fine-grained access control and we can
    check the authentication status in the catalog, schema, or fileset
    level. For instance, when considering Kerberos authentication, we can
    authenticate filesets by catalog level, schema level, and file level,
    and fileset level authentication configuration has the highest priority.
    
    Fix: #3462
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    IT
---
 build.gradle.kts                                   |   2 +-
 .../gravitino/catalog/hadoop/HadoopCatalog.java    |  14 +-
 .../catalog/hadoop/HadoopCatalogOperations.java    |  59 +--
 .../hadoop/HadoopFilesetPropertiesMetadata.java    |   9 +-
 .../catalog/hadoop/HadoopProxyPlugin.java          |   8 +-
 .../hadoop/HadoopSchemaPropertiesMetadata.java     |   4 +
 .../hadoop/SecureHadoopCatalogOperations.java      | 456 +++++++++++++++++++++
 .../authentication/AuthenticationConfig.java       |  13 +
 .../authentication/kerberos/KerberosClient.java    |  57 +--
 .../hadoop/TestHadoopCatalogOperations.java        |  95 +++--
 .../test/HadoopUserAuthenticationIT.java           | 319 ++++++++++++++
 .../hive/integration/test/CatalogHiveIT.java       |   2 +-
 .../tests/integration/base_hadoop_env.py           |   2 +-
 .../tests/integration/hdfs_container.py            |   2 +-
 .../tests/integration/integration_test_env.py      |   2 +-
 .../org/apache/gravitino/utils/PrincipalUtils.java |   2 +
 .../integration/test/container/ContainerSuite.java |   4 +-
 integration-test/trino-it/docker-compose.yaml      |   4 +-
 .../connector/integration/test/SparkCommonIT.java  |   4 +-
 19 files changed, 930 insertions(+), 128 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index feb62f828..edf9bcbb5 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -163,7 +163,7 @@ allprojects {
       // Default use MiniGravitino to run integration tests
       param.environment("GRAVITINO_ROOT_DIR", project.rootDir.path)
       param.environment("IT_PROJECT_DIR", project.buildDir.path)
-      param.environment("HADOOP_USER_NAME", "datastrato")
+      param.environment("HADOOP_USER_NAME", "anonymous")
       param.environment("HADOOP_HOME", "/tmp")
       param.environment("PROJECT_VERSION", project.version)
 
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
index 02fad267e..8774705cf 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
@@ -19,12 +19,9 @@
 package org.apache.gravitino.catalog.hadoop;
 
 import java.util.Map;
-import java.util.Optional;
-import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.PropertiesMetadata;
-import org.apache.gravitino.connector.ProxyPlugin;
 import org.apache.gravitino.connector.capability.Capability;
 
 /**
@@ -50,7 +47,7 @@ public class HadoopCatalog extends BaseCatalog<HadoopCatalog> 
{
 
   @Override
   protected CatalogOperations newOps(Map<String, String> config) {
-    HadoopCatalogOperations ops = new HadoopCatalogOperations();
+    CatalogOperations ops = new SecureHadoopCatalogOperations();
     return ops;
   }
 
@@ -59,15 +56,6 @@ public class HadoopCatalog extends 
BaseCatalog<HadoopCatalog> {
     return new HadoopCatalogCapability();
   }
 
-  @Override
-  protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
-    boolean impersonationEnabled = new 
KerberosConfig(config).isImpersonationEnabled();
-    if (!impersonationEnabled) {
-      return Optional.empty();
-    }
-    return Optional.of(new HadoopProxyPlugin());
-  }
-
   @Override
   public PropertiesMetadata catalogPropertiesMetadata() throws 
UnsupportedOperationException {
     return CATALOG_PROPERTIES_META;
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 6b49c1310..eed8da9a1 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -19,17 +19,14 @@
 package org.apache.gravitino.catalog.hadoop;
 
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -43,12 +40,9 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.StringIdentifier;
-import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
-import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.connector.ProxyPlugin;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.exceptions.AlreadyExistsException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
@@ -68,8 +62,6 @@ import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,11 +82,6 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
 
   private Map<String, String> conf;
 
-  @SuppressWarnings("unused")
-  private ProxyPlugin proxyPlugin;
-
-  private String kerberosRealm;
-
   private CatalogInfo catalogInfo;
 
   HadoopCatalogOperations(EntityStore store) {
@@ -105,8 +92,20 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
     this(GravitinoEnv.getInstance().entityStore());
   }
 
-  public String getKerberosRealm() {
-    return kerberosRealm;
+  public EntityStore getStore() {
+    return store;
+  }
+
+  public CatalogInfo getCatalogInfo() {
+    return catalogInfo;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public Map<String, String> getConf() {
+    return conf;
   }
 
   @Override
@@ -134,32 +133,12 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
                 .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.LOCATION);
     conf.forEach(hadoopConf::set);
 
-    initAuthentication(conf, hadoopConf);
     this.catalogStorageLocation =
         StringUtils.isNotBlank(catalogLocation)
             ? Optional.of(catalogLocation).map(Path::new)
             : Optional.empty();
   }
 
-  private void initAuthentication(Map<String, String> conf, Configuration 
hadoopConf) {
-    AuthenticationConfig config = new AuthenticationConfig(conf);
-    String authType = config.getAuthType();
-
-    if (StringUtils.equalsIgnoreCase(authType, 
AuthenticationMethod.KERBEROS.name())) {
-      hadoopConf.set(
-          HADOOP_SECURITY_AUTHENTICATION,
-          AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
-      UserGroupInformation.setConfiguration(hadoopConf);
-      try {
-        KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
-        File keytabFile = 
kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
-        this.kerberosRealm = 
kerberosClient.login(keytabFile.getAbsolutePath());
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to login with Kerberos", e);
-      }
-    }
-  }
-
   @Override
   public NameIdentifier[] listFilesets(Namespace namespace) throws 
NoSuchSchemaException {
     try {
@@ -278,9 +257,9 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
             .withNamespace(ident.namespace())
             .withComment(comment)
             .withFilesetType(type)
-            // Store the storageLocation to the store. If the 
"storageLocation" is null for
-            // managed fileset, Gravitino will get and store the location 
based on the
-            // catalog/schema's location and store it to the store.
+            // Store the storageLocation to the store. If the 
"storageLocation" is null for managed
+            // fileset, Gravitino will get and store the location based on the 
catalog/schema's
+            // location and store it to the store.
             .withStorageLocation(filesetPath.toString())
             .withProperties(properties)
             .withAuditInfo(
@@ -664,8 +643,4 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
     FileSystem defaultFs = FileSystem.get(configuration);
     return path.makeQualified(defaultFs.getUri(), 
defaultFs.getWorkingDirectory());
   }
-
-  void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) {
-    this.proxyPlugin = hadoopProxyPlugin;
-  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 76bcc5318..250a48d29 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -18,8 +18,10 @@
  */
 package org.apache.gravitino.catalog.hadoop;
 
-import java.util.Collections;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
@@ -27,6 +29,9 @@ public class HadoopFilesetPropertiesMetadata extends 
BasePropertiesMetadata {
 
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return Collections.emptyMap();
+    ImmutableMap.Builder<String, PropertyEntry<?>> builder = 
ImmutableMap.builder();
+    builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
+    builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
+    return builder.build();
   }
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
index 9287501c3..4421177bd 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
@@ -31,9 +31,10 @@ import org.apache.gravitino.connector.ProxyPlugin;
 import org.apache.gravitino.utils.Executable;
 import org.apache.hadoop.security.UserGroupInformation;
 
+@Deprecated
 public class HadoopProxyPlugin implements ProxyPlugin {
-  private HadoopCatalogOperations ops;
-  private UserGroupInformation realUser;
+  private SecureHadoopCatalogOperations ops;
+  private final UserGroupInformation realUser;
 
   public HadoopProxyPlugin() {
     try {
@@ -82,7 +83,6 @@ public class HadoopProxyPlugin implements ProxyPlugin {
 
   @Override
   public void bindCatalogOperation(CatalogOperations ops) {
-    this.ops = ((HadoopCatalogOperations) ops);
-    this.ops.setProxyPlugin(this);
+    this.ops = ((SecureHadoopCatalogOperations) ops);
   }
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
index addd67754..8892433ac 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
@@ -20,6 +20,8 @@ package org.apache.gravitino.catalog.hadoop;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
@@ -45,6 +47,8 @@ public class HadoopSchemaPropertiesMetadata extends 
BasePropertiesMetadata {
                   true /* immutable */,
                   null,
                   false /* hidden */))
+          .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
+          .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
           .build();
 
   @Override
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
new file mode 100644
index 000000000..30cc53f32
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import javax.security.auth.Subject;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient;
+import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("removal")
+public class SecureHadoopCatalogOperations
+    implements CatalogOperations, SupportsSchemas, FilesetCatalog {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(SecureHadoopCatalogOperations.class);
+
+  private final HadoopCatalogOperations hadoopCatalogOperations;
+
+  private final List<Closeable> closeables = Lists.newArrayList();
+
+  private final Map<NameIdentifier, UserInfo> userInfoMap = 
Maps.newConcurrentMap();
+
+  public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s";
+
+  private String kerberosRealm;
+
+  public SecureHadoopCatalogOperations() {
+    this.hadoopCatalogOperations = new HadoopCatalogOperations();
+  }
+
+  public SecureHadoopCatalogOperations(EntityStore store) {
+    this.hadoopCatalogOperations = new HadoopCatalogOperations(store);
+  }
+
+  @VisibleForTesting
+  public HadoopCatalogOperations getBaseHadoopCatalogOperations() {
+    return hadoopCatalogOperations;
+  }
+
+  public String getKerberosRealm() {
+    return kerberosRealm;
+  }
+
+  static class UserInfo {
+    UserGroupInformation loginUser;
+    boolean enableUserImpersonation;
+    String keytabPath;
+    String realm;
+
+    static UserInfo of(
+        UserGroupInformation loginUser,
+        boolean enableUserImpersonation,
+        String keytabPath,
+        String kerberosRealm) {
+      UserInfo userInfo = new UserInfo();
+      userInfo.loginUser = loginUser;
+      userInfo.enableUserImpersonation = enableUserImpersonation;
+      userInfo.keytabPath = keytabPath;
+      userInfo.realm = kerberosRealm;
+      return userInfo;
+    }
+  }
+
+  // We have overridden the createFileset, dropFileset, createSchema, 
dropSchema method to reset
+  // the current user based on the name identifier.
+
+  @Override
+  public Fileset createFileset(
+      NameIdentifier ident,
+      String comment,
+      Fileset.Type type,
+      String storageLocation,
+      Map<String, String> properties)
+      throws NoSuchSchemaException, FilesetAlreadyExistsException {
+    UserGroupInformation currentUser = getUGIByIdent(properties, ident);
+    String apiUser = PrincipalUtils.getCurrentUserName();
+    return doAs(
+        currentUser,
+        () -> {
+          setUser(apiUser);
+          return hadoopCatalogOperations.createFileset(
+              ident, comment, type, storageLocation, properties);
+        },
+        ident);
+  }
+
+  @Override
+  public boolean dropFileset(NameIdentifier ident) {
+    FilesetEntity filesetEntity;
+    try {
+      filesetEntity =
+          hadoopCatalogOperations
+              .getStore()
+              .get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
+    } catch (NoSuchEntityException e) {
+      LOG.warn("Fileset {} does not exist", ident);
+      return false;
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to delete fileset " + ident, ioe);
+    }
+
+    // Reset the current user based on the name identifier.
+    UserGroupInformation currentUser = 
getUGIByIdent(filesetEntity.properties(), ident);
+
+    boolean r = doAs(currentUser, () -> 
hadoopCatalogOperations.dropFileset(ident), ident);
+    cleanUserInfo(ident);
+    return r;
+  }
+
+  @Override
+  public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
+      throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    // Reset the current user based on the name identifier and properties.
+    UserGroupInformation currentUser = getUGIByIdent(properties, ident);
+    String apiUser = PrincipalUtils.getCurrentUserName();
+
+    return doAs(
+        currentUser,
+        () -> {
+          setUser(apiUser);
+          return hadoopCatalogOperations.createSchema(ident, comment, 
properties);
+        },
+        ident);
+  }
+
+  @Override
+  public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
+    try {
+      SchemaEntity schemaEntity =
+          hadoopCatalogOperations
+              .getStore()
+              .get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
+      Map<String, String> properties =
+          
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
+
+      // Reset the current user based on the name identifier.
+      UserGroupInformation user = getUGIByIdent(properties, ident);
+
+      boolean r = doAs(user, () -> hadoopCatalogOperations.dropSchema(ident, 
cascade), ident);
+      cleanUserInfo(ident);
+      return r;
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to delete schema " + ident, ioe);
+    }
+  }
+
+  @Override
+  public void initialize(
+      Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
+      throws RuntimeException {
+    hadoopCatalogOperations.initialize(config, info, propertiesMetadata);
+    initAuthentication(hadoopCatalogOperations.getConf(), 
hadoopCatalogOperations.getHadoopConf());
+  }
+
+  @Override
+  public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
+      throws NoSuchFilesetException, IllegalArgumentException {
+    Fileset fileset = hadoopCatalogOperations.alterFileset(ident, changes);
+
+    String finalName = ident.name();
+    for (FilesetChange change : changes) {
+      if (change instanceof FilesetChange.RenameFileset) {
+        finalName = ((FilesetChange.RenameFileset) change).getNewName();
+      }
+    }
+    if (!ident.name().equals(finalName)) {
+      UserInfo userInfo = userInfoMap.remove(ident);
+      if (userInfo != null) {
+        userInfoMap.put(NameIdentifier.of(ident.namespace(), finalName), 
userInfo);
+      }
+    }
+
+    return fileset;
+  }
+
+  @Override
+  public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
+    return hadoopCatalogOperations.listSchemas(namespace);
+  }
+
+  @Override
+  public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+    return hadoopCatalogOperations.loadSchema(ident);
+  }
+
+  @Override
+  public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+      throws NoSuchSchemaException {
+    return hadoopCatalogOperations.alterSchema(ident, changes);
+  }
+
+  @Override
+  public NameIdentifier[] listFilesets(Namespace namespace) throws 
NoSuchSchemaException {
+    return hadoopCatalogOperations.listFilesets(namespace);
+  }
+
+  @Override
+  public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
+    return hadoopCatalogOperations.loadFileset(ident);
+  }
+
+  @Override
+  public void close() throws IOException {
+    hadoopCatalogOperations.close();
+
+    userInfoMap.clear();
+    closeables.forEach(
+        c -> {
+          try {
+            c.close();
+          } catch (Exception e) {
+            LOG.error("Failed to close resource", e);
+          }
+        });
+  }
+
+  @Override
+  public void testConnection(
+      NameIdentifier catalogIdent,
+      Catalog.Type type,
+      String provider,
+      String comment,
+      Map<String, String> properties)
+      throws Exception {
+    hadoopCatalogOperations.testConnection(catalogIdent, type, provider, 
comment, properties);
+  }
+
+  private void initAuthentication(Map<String, String> conf, Configuration 
hadoopConf) {
+    AuthenticationConfig config = new AuthenticationConfig(conf);
+    CatalogInfo catalogInfo = hadoopCatalogOperations.getCatalogInfo();
+    if (config.isKerberosAuth()) {
+      initKerberos(
+          conf, hadoopConf, NameIdentifier.of(catalogInfo.namespace(), 
catalogInfo.name()), true);
+    } else if (config.isSimpleAuth()) {
+      try {
+        // Use service login user.
+        UserGroupInformation u = UserGroupInformation.getCurrentUser();
+        userInfoMap.put(
+            NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()),
+            UserInfo.of(u, config.isImpersonationEnabled(), null, null));
+      } catch (Exception e) {
+        throw new RuntimeException("Can't get service user for Hadoop 
catalog", e);
+      }
+    }
+  }
+
+  /**
+   * Get the UserGroupInformation based on the NameIdentifier and properties.
+   *
+   * <p>Note: As UserGroupInformation is a static class, to avoid the thread 
safety issue, we need
+   * to use synchronized to ensure the thread safety: Make login and 
getLoginUser atomic.
+   */
+  public synchronized String initKerberos(
+      Map<String, String> properties,
+      Configuration configuration,
+      NameIdentifier ident,
+      boolean refreshCredentials) {
+    // Init schema level kerberos authentication.
+
+    CatalogInfo catalogInfo = hadoopCatalogOperations.getCatalogInfo();
+    String keytabPath =
+        String.format(
+            GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + 
ident.toString().replace(".", "-"));
+    KerberosConfig kerberosConfig = new KerberosConfig(properties);
+    if (kerberosConfig.isKerberosAuth()) {
+      configuration.set(
+          HADOOP_SECURITY_AUTHENTICATION,
+          AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
+      try {
+        UserGroupInformation.setConfiguration(configuration);
+        KerberosClient kerberosClient =
+            new KerberosClient(properties, configuration, refreshCredentials);
+        // Add the kerberos client to the closable to close resources.
+        closeables.add(kerberosClient);
+
+        File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath);
+        String kerberosRealm = 
kerberosClient.login(keytabFile.getAbsolutePath());
+        // Should this kerberosRealm need to be equals to the realm in the 
principal?
+        userInfoMap.put(
+            ident,
+            UserInfo.of(
+                UserGroupInformation.getLoginUser(),
+                kerberosConfig.isImpersonationEnabled(),
+                keytabPath,
+                kerberosRealm));
+        return kerberosRealm;
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to login with Kerberos", e);
+      }
+    }
+
+    return null;
+  }
+
+  private UserGroupInformation getUGIByIdent(Map<String, String> properties, 
NameIdentifier ident) {
+    KerberosConfig kerberosConfig = new KerberosConfig(properties);
+    if (kerberosConfig.isKerberosAuth()) {
+      // We assume that the realm of catalog is the same as the realm of the 
schema and table.
+      initKerberos(properties, new Configuration(), ident, false);
+    }
+    // If the kerberos is not enabled (simple mode), we will use the current 
user
+    return getUserBaseOnNameIdentifier(ident);
+  }
+
+  private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier 
nameIdentifier) {
+    UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier);
+    if (userInfo == null) {
+      throw new RuntimeException("Failed to get user information for " + 
nameIdentifier);
+    }
+
+    UserGroupInformation ugi = userInfo.loginUser;
+    boolean userImpersonation = userInfo.enableUserImpersonation;
+    if (userImpersonation) {
+      String proxyKerberosPrincipalName = PrincipalUtils.getCurrentUserName();
+      if (!proxyKerberosPrincipalName.contains("@")) {
+        proxyKerberosPrincipalName =
+            String.format("%s@%s", proxyKerberosPrincipalName, userInfo.realm);
+      }
+
+      ugi = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, 
ugi);
+    }
+
+    return ugi;
+  }
+
+  private UserInfo getNearestUserGroupInformation(NameIdentifier 
nameIdentifier) {
+    NameIdentifier currentNameIdentifier = nameIdentifier;
+    while (currentNameIdentifier != null) {
+      if (userInfoMap.containsKey(currentNameIdentifier)) {
+        return userInfoMap.get(currentNameIdentifier);
+      }
+
+      String[] levels = currentNameIdentifier.namespace().levels();
+      // The ident is catalog level.
+      if (levels.length <= 1) {
+        return null;
+      }
+      currentNameIdentifier = 
NameIdentifier.of(currentNameIdentifier.namespace().levels());
+    }
+    return null;
+  }
+
+  private void cleanUserInfo(NameIdentifier identifier) {
+    UserInfo userInfo = userInfoMap.remove(identifier);
+    if (userInfo != null) {
+      removeFile(userInfo.keytabPath);
+    }
+  }
+
+  private void removeFile(String filePath) {
+    if (filePath == null) {
+      return;
+    }
+
+    File file = new File(filePath);
+    if (file.exists()) {
+      file.delete();
+    }
+  }
+
+  private <T> T doAs(
+      UserGroupInformation userGroupInformation,
+      PrivilegedExceptionAction<T> action,
+      NameIdentifier ident) {
+    try {
+      return userGroupInformation.doAs(action);
+    } catch (IOException | InterruptedException ioe) {
+      throw new RuntimeException("Failed to operation on fileset " + ident, 
ioe);
+    } catch (UndeclaredThrowableException e) {
+      Throwable innerException = e.getCause();
+      if (innerException instanceof PrivilegedActionException) {
+        throw new RuntimeException(innerException.getCause());
+      } else if (innerException instanceof InvocationTargetException) {
+        throw new RuntimeException(innerException.getCause());
+      } else {
+        throw new RuntimeException(innerException);
+      }
+    }
+  }
+
+  /**
+   * Add the user to the subject so that we can get the last user in the 
subject. Hadoop catalog
+   * uses this method to pass api user from the client side, so that we can 
get the user in the
+   * subject. Please do not mix it with UserGroupInformation.getCurrentUser().
+   *
+   * @param apiUser the username to set.
+   */
+  private void setUser(String apiUser) {
+    java.security.AccessControlContext context = 
java.security.AccessController.getContext();
+    Subject subject = Subject.getSubject(context);
+    subject.getPrincipals().add(new UserPrincipal(apiUser));
+  }
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
index 88e70ffdc..384716ce1 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
@@ -34,6 +34,11 @@ public class AuthenticationConfig extends Config {
 
   public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
 
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS
+  }
+
   public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
 
   public AuthenticationConfig(Map<String, String> properties) {
@@ -64,6 +69,14 @@ public class AuthenticationConfig extends Config {
     return get(ENABLE_IMPERSONATION_ENTRY);
   }
 
+  public boolean isSimpleAuth() {
+    return AuthenticationType.SIMPLE.name().equalsIgnoreCase(getAuthType());
+  }
+
+  public boolean isKerberosAuth() {
+    return AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType());
+  }
+
   public static final Map<String, PropertyEntry<?>> 
AUTHENTICATION_PROPERTY_ENTRIES =
       new ImmutableMap.Builder<String, PropertyEntry<?>>()
           .put(
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
index b0965729b..dd746b4da 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
@@ -22,6 +22,7 @@ package 
org.apache.gravitino.catalog.hadoop.authentication.kerberos;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
@@ -36,19 +37,19 @@ import 
org.apache.hadoop.security.authentication.util.KerberosName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KerberosClient {
+public class KerberosClient implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
 
-  public static final String GRAVITINO_KEYTAB_FORMAT = 
"keytabs/gravitino-hadoop-%s-keytab";
-
-  private final ScheduledThreadPoolExecutor checkTgtExecutor;
+  private ScheduledThreadPoolExecutor checkTgtExecutor;
   private final Map<String, String> conf;
   private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
 
-  public KerberosClient(Map<String, String> conf, Configuration hadoopConf) {
+  public KerberosClient(
+      Map<String, String> conf, Configuration hadoopConf, boolean 
refreshCredentials) {
     this.conf = conf;
     this.hadoopConf = hadoopConf;
-    this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+    this.refreshCredentials = refreshCredentials;
   }
 
   public String login(String keytabFilePath) throws IOException {
@@ -66,28 +67,31 @@ public class KerberosClient {
     // Login
     UserGroupInformation.setConfiguration(hadoopConf);
     KerberosName.resetDefaultRealm();
-    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
-    UserGroupInformation kerberosLoginUgi = 
UserGroupInformation.getCurrentUser();
+    UserGroupInformation kerberosLoginUgi =
+        UserGroupInformation.loginUserFromKeytabAndReturnUGI(catalogPrincipal, 
keytabFilePath);
+    UserGroupInformation.setLoginUser(kerberosLoginUgi);
 
     // Refresh the cache if it's out of date.
-    int checkInterval = kerberosConfig.getCheckIntervalSec();
-    checkTgtExecutor.scheduleAtFixedRate(
-        () -> {
-          try {
-            kerberosLoginUgi.checkTGTAndReloginFromKeytab();
-          } catch (Exception e) {
-            LOG.error("Fail to refresh ugi token: ", e);
-          }
-        },
-        checkInterval,
-        checkInterval,
-        TimeUnit.SECONDS);
+    if (refreshCredentials) {
+      this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              kerberosLoginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
 
     return principalComponents.get(1);
   }
 
-  public File saveKeyTabFileFromUri(Long catalogId) throws IOException {
-
+  public File saveKeyTabFileFromUri(String keytabPath) throws IOException {
     KerberosConfig kerberosConfig = new KerberosConfig(conf);
 
     String keyTabUri = kerberosConfig.getKeytab();
@@ -103,7 +107,7 @@ public class KerberosClient {
       keytabsDir.mkdir();
     }
 
-    File keytabFile = new File(String.format(GRAVITINO_KEYTAB_FORMAT, 
catalogId));
+    File keytabFile = new File(keytabPath);
     keytabFile.deleteOnExit();
     if (keytabFile.exists() && !keytabFile.delete()) {
       throw new IllegalStateException(
@@ -119,4 +123,11 @@ public class KerberosClient {
   private static ThreadFactory getThreadFactory(String factoryName) {
     return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
   }
+
+  @Override
+  public void close() throws IOException {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();
+    }
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 3c8a4d463..3b9ca0d7f 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -59,6 +59,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
@@ -128,6 +129,30 @@ public class TestHadoopCatalogOperations {
 
   private static IdGenerator idGenerator;
 
+  private static CatalogInfo randomCatalogInfo() {
+    return new CatalogInfo(
+        idGenerator.nextId(),
+        "catalog1",
+        CatalogInfo.Type.FILESET,
+        "provider1",
+        "comment1",
+        Maps.newHashMap(),
+        null,
+        Namespace.of("m1", "c1"));
+  }
+
+  private static CatalogInfo randomCatalogInfo(String metalakeName, String 
catalogName) {
+    return new CatalogInfo(
+        idGenerator.nextId(),
+        catalogName,
+        CatalogInfo.Type.FILESET,
+        "hadoop",
+        "comment1",
+        Maps.newHashMap(),
+        null,
+        Namespace.of(metalakeName));
+  }
+
   @BeforeAll
   public static void setUp() {
     Config config = Mockito.mock(Config.class);
@@ -195,14 +220,18 @@ public class TestHadoopCatalogOperations {
   @Test
   public void testHadoopCatalogConfiguration() {
     Map<String, String> emptyProps = Maps.newHashMap();
-    HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
-    ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+    SecureHadoopCatalogOperations secOps = new 
SecureHadoopCatalogOperations(store);
+
+    HadoopCatalogOperations ops = secOps.getBaseHadoopCatalogOperations();
+
+    CatalogInfo catalogInfo = randomCatalogInfo();
+    ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
     Configuration conf = ops.hadoopConf;
     String value = conf.get("fs.defaultFS");
     Assertions.assertEquals("file:///", value);
 
     emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", 
"hdfs://localhost:9000");
-    ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+    ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
     Configuration conf1 = ops.hadoopConf;
     String value1 = conf1.get("fs.defaultFS");
     Assertions.assertEquals("hdfs://localhost:9000", value1);
@@ -210,7 +239,7 @@ public class TestHadoopCatalogOperations {
     Assertions.assertFalse(ops.catalogStorageLocation.isPresent());
 
     emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/catalog");
-    ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+    ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
     Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
     Path expectedPath = new Path("file:///tmp/catalog");
     Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
@@ -306,8 +335,8 @@ public class TestHadoopCatalogOperations {
 
     Assertions.assertEquals(name, schema.name());
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1", 
name));
       Assertions.assertEquals(name, schema1.name());
       Assertions.assertEquals(comment, schema1.comment());
@@ -330,8 +359,8 @@ public class TestHadoopCatalogOperations {
     createSchema(name, comment, null, null);
     createSchema(name1, comment1, null, null);
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Set<NameIdentifier> idents =
           Arrays.stream(ops.listSchemas(Namespace.of("m1", 
"c1"))).collect(Collectors.toSet());
       Assertions.assertTrue(idents.size() >= 2);
@@ -348,8 +377,8 @@ public class TestHadoopCatalogOperations {
     Schema schema = createSchema(name, comment, catalogPath, null);
     Assertions.assertEquals(name, schema.name());
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1", 
name));
       Assertions.assertEquals(name, schema1.name());
       Assertions.assertEquals(comment, schema1.comment());
@@ -395,10 +424,10 @@ public class TestHadoopCatalogOperations {
     Assertions.assertEquals(name, schema.name());
     NameIdentifier id = NameIdentifierUtil.ofSchema("m1", "c1", name);
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
       ops.initialize(
           ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, 
catalogPath),
-          null,
+          randomCatalogInfo("m1", "c1"),
           HADOOP_PROPERTIES_METADATA);
       Schema schema1 = ops.loadSchema(id);
       Assertions.assertEquals(name, schema1.name());
@@ -452,8 +481,8 @@ public class TestHadoopCatalogOperations {
     }
 
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
       if (!ops.schemaExists(schemaIdent)) {
         createSchema(schemaName, comment, catalogPath, schemaPath);
       }
@@ -507,8 +536,8 @@ public class TestHadoopCatalogOperations {
             + " when it's catalog and schema "
             + "location are not set",
         exception.getMessage());
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Throwable e =
           Assertions.assertThrows(
               NoSuchFilesetException.class, () -> 
ops.loadFileset(filesetIdent));
@@ -523,8 +552,8 @@ public class TestHadoopCatalogOperations {
     Assertions.assertEquals(
         "Storage location must be set for external fileset " + filesetIdent,
         exception1.getMessage());
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Throwable e =
           Assertions.assertThrows(
               NoSuchFilesetException.class, () -> 
ops.loadFileset(filesetIdent));
@@ -543,8 +572,8 @@ public class TestHadoopCatalogOperations {
       createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, 
null);
     }
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       Set<NameIdentifier> idents =
           Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName)))
               .collect(Collectors.toSet());
@@ -574,8 +603,8 @@ public class TestHadoopCatalogOperations {
     }
 
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
       if (!ops.schemaExists(schemaIdent)) {
         createSchema(schemaName, comment, catalogPath, schemaPath);
       }
@@ -620,8 +649,8 @@ public class TestHadoopCatalogOperations {
     FilesetChange change1 = FilesetChange.setProperty("k1", "v1");
     FilesetChange change2 = FilesetChange.removeProperty("k1");
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
 
       Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -685,8 +714,8 @@ public class TestHadoopCatalogOperations {
     Fileset fileset = createFileset(name, schemaName, comment, 
Fileset.Type.MANAGED, null, null);
 
     FilesetChange change1 = FilesetChange.updateComment("comment26_new");
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
 
       Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -708,8 +737,8 @@ public class TestHadoopCatalogOperations {
     Fileset fileset = createFileset(name, schemaName, comment, 
Fileset.Type.MANAGED, null, null);
 
     FilesetChange change1 = FilesetChange.removeComment();
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
       NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
 
       Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -722,7 +751,7 @@ public class TestHadoopCatalogOperations {
 
   @Test
   public void testTestConnection() {
-    HadoopCatalogOperations catalogOperations = new 
HadoopCatalogOperations(store);
+    SecureHadoopCatalogOperations catalogOperations = new 
SecureHadoopCatalogOperations(store);
     Assertions.assertDoesNotThrow(
         () ->
             catalogOperations.testConnection(
@@ -973,8 +1002,8 @@ public class TestHadoopCatalogOperations {
       props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
     }
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(props, null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(props, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
 
       NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
name);
       Map<String, String> schemaProps = Maps.newHashMap();
@@ -1002,8 +1031,8 @@ public class TestHadoopCatalogOperations {
       props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
     }
 
-    try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
-      ops.initialize(props, null, HADOOP_PROPERTIES_METADATA);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(props, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
 
       NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
       Map<String, String> filesetProps = Maps.newHashMap();
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
index 5b79b5302..6c5fc1ae6 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
@@ -31,6 +31,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
@@ -52,6 +54,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.MountableFile;
 import sun.security.krb5.KrbException;
 
 @Tag("gravitino-docker-test")
@@ -72,6 +75,15 @@ public class HadoopUserAuthenticationIT extends AbstractIT {
   private static final String HADOOP_CLIENT_PRINCIPAL = "cli@HADOOPKRB";
   private static final String HADOOP_CLIENT_KEYTAB = "/client.keytab";
 
+  private static final String HADOOP_SCHEMA_PRINCIPAL = "cli_schema";
+  private static final String HADOOP_FILESET_PRINCIPAL = "cli_fileset";
+
+  private static final String HADOOP_SCHEMA_KEYTAB = "/cli_schema.keytab";
+  private static final String HADOOP_FILESET_KEYTAB = "/cli_fileset.keytab";
+
+  private static final String REALM = "HADOOPKRB";
+  private static final String ADMIN_PASSWORD = "Admin12!";
+
   private static String TMP_DIR;
 
   private static String HDFS_URL;
@@ -127,6 +139,8 @@ public class HadoopUserAuthenticationIT extends AbstractIT {
         .getContainer()
         .copyFileFromContainer("/gravitino_client.keytab", TMP_DIR + 
GRAVITINO_CLIENT_KEYTAB);
 
+    createKeyTableForSchemaAndFileset();
+
     // Keytab of the Gravitino server
     kerberosHiveContainer
         .getContainer()
@@ -153,6 +167,59 @@ public class HadoopUserAuthenticationIT extends AbstractIT 
{
     System.setProperty("sun.security.krb5.debug", "true");
   }
 
+  private static void createKeyTableForSchemaAndFileset() throws IOException {
+    String shellContent =
+        "echo -e \"%s\\n%s\" | kadmin.local -q \"addprinc %s@%s\""
+            + "\n"
+            + "kadmin.local -q \"xst -k /%s.keytab -norandkey %s@%s\"";
+
+    String createSchemaShellFile = String.format("/%s.sh", 
HADOOP_SCHEMA_PRINCIPAL);
+    String createFileSetShellFile = String.format("/%s.sh", 
HADOOP_FILESET_PRINCIPAL);
+
+    FileUtils.writeStringToFile(
+        Paths.get(TMP_DIR + createSchemaShellFile).toFile(),
+        String.format(
+            shellContent,
+            ADMIN_PASSWORD,
+            ADMIN_PASSWORD,
+            HADOOP_SCHEMA_PRINCIPAL,
+            REALM,
+            HADOOP_SCHEMA_PRINCIPAL,
+            HADOOP_SCHEMA_PRINCIPAL,
+            REALM),
+        StandardCharsets.UTF_8);
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileToContainer(
+            MountableFile.forHostPath(TMP_DIR + createSchemaShellFile), 
createSchemaShellFile);
+    kerberosHiveContainer.executeInContainer("bash", createSchemaShellFile);
+
+    FileUtils.writeStringToFile(
+        Paths.get(TMP_DIR + createFileSetShellFile).toFile(),
+        String.format(
+            shellContent,
+            ADMIN_PASSWORD,
+            ADMIN_PASSWORD,
+            HADOOP_FILESET_PRINCIPAL,
+            REALM,
+            HADOOP_FILESET_PRINCIPAL,
+            HADOOP_FILESET_PRINCIPAL,
+            REALM),
+        StandardCharsets.UTF_8);
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileToContainer(
+            MountableFile.forHostPath(TMP_DIR + createFileSetShellFile), 
createFileSetShellFile);
+    kerberosHiveContainer.executeInContainer("bash", createFileSetShellFile);
+
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileFromContainer(HADOOP_SCHEMA_KEYTAB, TMP_DIR + 
HADOOP_SCHEMA_KEYTAB);
+    kerberosHiveContainer
+        .getContainer()
+        .copyFileFromContainer(HADOOP_FILESET_KEYTAB, TMP_DIR + 
HADOOP_FILESET_KEYTAB);
+  }
+
   private static void addKerberosConfig() {
     AbstractIT.customConfigs.put("gravitino.authenticator", "kerberos");
     AbstractIT.customConfigs.put(
@@ -223,4 +290,256 @@ public class HadoopUserAuthenticationIT extends 
AbstractIT {
 
     catalog.asSchemas().dropSchema(SCHEMA_NAME, true);
   }
+
+  @Test
+  void testCreateSchemaWithKerberos() {
+    KerberosTokenProvider provider =
+        KerberosTokenProvider.builder()
+            .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL)
+            .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB))
+            .build();
+    adminClient = 
GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build();
+
+    String metalakeName = GravitinoITUtils.genRandomName("metalake");
+    String catalogName = GravitinoITUtils.genRandomName("catalog");
+    GravitinoMetalake gravitinoMetalake =
+        adminClient.createMetalake(metalakeName, null, ImmutableMap.of());
+
+    // Create a catalog
+    Map<String, String> properties = Maps.newHashMap();
+    String location = HDFS_URL + "/user/hadoop/" + catalogName;
+
+    properties.put(AUTH_TYPE_KEY, "kerberos");
+    properties.put(IMPERSONATION_ENABLE_KEY, "true");
+    properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB);
+    properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL);
+    properties.put("location", location);
+
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName);
+
+    Catalog catalog =
+        gravitinoMetalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "comment", 
properties);
+
+    // Test create schema
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
ImmutableMap.of()));
+    String exceptionMessage = Throwables.getStackTraceAsString(exception);
+    // Make sure real user is 'gravitino_client'
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=gravitino_client, 
access=WRITE"));
+
+    Map<String, String> schemaProperty = new HashMap<>();
+    schemaProperty.put(AUTH_TYPE_KEY, "kerberos");
+    // Disable impersonation here, so the user is the same as the principal
+    schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+    schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB);
+    schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM);
+
+    exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
schemaProperty));
+    exceptionMessage = Throwables.getStackTraceAsString(exception);
+    // Make sure real user is 'cli_schema'
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=cli_schema, 
access=WRITE"));
+
+    // enable user impersonation, so the real user is gravitino_client
+    schemaProperty.put(IMPERSONATION_ENABLE_KEY, "true");
+    exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
schemaProperty));
+    exceptionMessage = Throwables.getStackTraceAsString(exception);
+    // Make sure real user is 'gravitino_client' if user impersonation enabled.
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=gravitino_client, 
access=WRITE"));
+
+    // Now try to give the user the permission to create schema again
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "gravitino_client", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(
+        () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
schemaProperty));
+
+    // Disable impersonation here, so the user is the same as the principal 
'cli_schema'
+    schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+    exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog.asSchemas().createSchema(SCHEMA_NAME + "_new", 
"comment", schemaProperty));
+    exceptionMessage = Throwables.getStackTraceAsString(exception);
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=cli_schema, 
access=WRITE"));
+
+    // END of test schema creation
+    Assertions.assertDoesNotThrow(() -> 
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
+
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "cli_schema", "/user/hadoop/" + catalogName);
+    Assertions.assertDoesNotThrow(
+        () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
schemaProperty));
+
+    catalog
+        .asFilesetCatalog()
+        .createFileset(
+            NameIdentifier.of(SCHEMA_NAME, TABLE_NAME),
+            "comment",
+            Fileset.Type.MANAGED,
+            null,
+            ImmutableMap.of());
+  }
+
+  @Test
+  void createFilesetWithKerberos() {
+    KerberosTokenProvider provider =
+        KerberosTokenProvider.builder()
+            .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL)
+            .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB))
+            .build();
+    adminClient = 
GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build();
+
+    String metalakeName = GravitinoITUtils.genRandomName("metalake");
+    String catalogName = GravitinoITUtils.genRandomName("catalog");
+    GravitinoMetalake gravitinoMetalake =
+        adminClient.createMetalake(metalakeName, null, ImmutableMap.of());
+
+    // Create a catalog
+    Map<String, String> properties = Maps.newHashMap();
+    String localtion = HDFS_URL + "/user/hadoop/" + catalogName;
+
+    properties.put(AUTH_TYPE_KEY, "kerberos");
+    properties.put(IMPERSONATION_ENABLE_KEY, "true");
+    properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB);
+    properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL);
+    properties.put("location", localtion);
+
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName);
+
+    Catalog catalog =
+        gravitinoMetalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "comment", 
properties);
+
+    Map<String, String> schemaProperty = new HashMap<>();
+    schemaProperty.put(AUTH_TYPE_KEY, "kerberos");
+    // Disable impersonation here, so the user is the same as the principal as 
'cli_schema'
+    schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+    schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB);
+    schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM);
+
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(
+        () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
schemaProperty));
+
+    catalog
+        .asFilesetCatalog()
+        .createFileset(
+            NameIdentifier.of(SCHEMA_NAME, TABLE_NAME),
+            "comment",
+            Fileset.Type.MANAGED,
+            null,
+            ImmutableMap.of());
+
+    Map<String, String> tableProperty = Maps.newHashMap();
+    tableProperty.put(AUTH_TYPE_KEY, "kerberos");
+    // Disable impersonation here, so the user is the same as the principal as 
'cli_schema'
+    tableProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+    tableProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_FILESET_KEYTAB);
+    tableProperty.put(PRINCIPAL_KEY, HADOOP_FILESET_PRINCIPAL + "@" + REALM);
+
+    String fileset1 = GravitinoITUtils.genRandomName("fileset1");
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asFilesetCatalog()
+                    .createFileset(
+                        NameIdentifier.of(SCHEMA_NAME, fileset1),
+                        "comment",
+                        Fileset.Type.MANAGED,
+                        null,
+                        tableProperty));
+    String exceptionMessage = Throwables.getStackTraceAsString(exception);
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=cli_fileset, 
access=WRITE"));
+
+    // Now change the owner of schema directory to 'cli_fileset'
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalog
+                .asFilesetCatalog()
+                .createFileset(
+                    NameIdentifier.of(SCHEMA_NAME, fileset1),
+                    "comment",
+                    Fileset.Type.MANAGED,
+                    null,
+                    tableProperty));
+
+    // enable user impersonation, so the real user is gravitino_client
+    tableProperty.put(IMPERSONATION_ENABLE_KEY, "true");
+    String fileset2 = GravitinoITUtils.genRandomName("fileset2");
+    exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asFilesetCatalog()
+                    .createFileset(
+                        NameIdentifier.of(SCHEMA_NAME, fileset2),
+                        "comment",
+                        Fileset.Type.MANAGED,
+                        null,
+                        tableProperty));
+    exceptionMessage = Throwables.getStackTraceAsString(exception);
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=gravitino_client, 
access=WRITE"));
+
+    // Now change the owner of schema directory to 'gravitino_client'
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalog
+                .asFilesetCatalog()
+                .createFileset(
+                    NameIdentifier.of(SCHEMA_NAME, fileset2),
+                    "comment",
+                    Fileset.Type.MANAGED,
+                    null,
+                    tableProperty));
+
+    // As the owner of the current schema directory is 'gravitino_client', the 
current user is
+    // cli_fileset.
+    Assertions.assertThrows(
+        Exception.class,
+        () -> 
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, 
fileset1)));
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" + 
catalogName);
+    // Now try to drop fileset
+    Assertions.assertDoesNotThrow(
+        () -> 
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, 
fileset1)));
+
+    Assertions.assertThrows(
+        Exception.class,
+        () -> 
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, 
fileset2)));
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(
+        () -> 
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, 
fileset2)));
+
+    Assertions.assertThrows(
+        Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME, 
true));
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" + 
catalogName);
+    Assertions.assertDoesNotThrow(() -> 
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
+  }
 }
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index a42043466..a2d2e9995 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -344,7 +344,7 @@ public class CatalogHiveIT extends AbstractIT {
     }
     Assertions.assertTrue(fileStatuses.length > 0);
     for (FileStatus fileStatus : fileStatuses) {
-      Assertions.assertEquals("datastrato", fileStatus.getOwner());
+      Assertions.assertEquals("anonymous", fileStatus.getOwner());
     }
   }
 
diff --git a/clients/client-python/tests/integration/base_hadoop_env.py 
b/clients/client-python/tests/integration/base_hadoop_env.py
index 9099ed13f..ea4bc2e6a 100644
--- a/clients/client-python/tests/integration/base_hadoop_env.py
+++ b/clients/client-python/tests/integration/base_hadoop_env.py
@@ -72,7 +72,7 @@ class BaseHadoopEnvironment:
     @classmethod
     def _configure_hadoop_environment(cls):
         logger.info("Configure hadoop environment.")
-        os.putenv("HADOOP_USER_NAME", "datastrato")
+        os.putenv("HADOOP_USER_NAME", "anonymous")
         os.putenv("HADOOP_HOME", 
f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}")
         os.putenv(
             "HADOOP_CONF_DIR",
diff --git a/clients/client-python/tests/integration/hdfs_container.py 
b/clients/client-python/tests/integration/hdfs_container.py
index 1c3f1c21d..b9e1887fc 100644
--- a/clients/client-python/tests/integration/hdfs_container.py
+++ b/clients/client-python/tests/integration/hdfs_container.py
@@ -99,7 +99,7 @@ class HDFSContainer:
                 image=image_name,
                 name=self._container_name,
                 detach=True,
-                environment={"HADOOP_USER_NAME": "datastrato"},
+                environment={"HADOOP_USER_NAME": "anonymous"},
                 network=self._network_name,
             )
         asyncio.run(check_hdfs_container_status(self._container))
diff --git a/clients/client-python/tests/integration/integration_test_env.py 
b/clients/client-python/tests/integration/integration_test_env.py
index 8f1a5a2c8..7b8c05f53 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -149,7 +149,7 @@ class IntegrationTestEnv(unittest.TestCase):
 
         # Restart Gravitino Server
         env_vars = os.environ.copy()
-        env_vars["HADOOP_USER_NAME"] = "datastrato"
+        env_vars["HADOOP_USER_NAME"] = "anonymous"
         result = subprocess.run(
             [gravitino_startup_script, "restart"],
             env=env_vars,
diff --git a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java 
b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
index a4b834698..ee8b0a0ee 100644
--- a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
+++ b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
@@ -44,12 +44,14 @@ public class PrincipalUtils {
     }
   }
 
+  // This method can't be used in nested `Subject#doAs` block.
   public static Principal getCurrentPrincipal() {
     java.security.AccessControlContext context = 
java.security.AccessController.getContext();
     Subject subject = Subject.getSubject(context);
     if (subject == null || 
subject.getPrincipals(UserPrincipal.class).isEmpty()) {
       return new UserPrincipal(AuthConstants.ANONYMOUS_USER);
     }
+
     return subject.getPrincipals(UserPrincipal.class).iterator().next();
   }
 
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index 4b3b5fdc4..4afaaeffd 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -99,7 +99,7 @@ public class ContainerSuite implements Closeable {
 
   public void startHiveContainer() {
     startHiveContainer(
-        ImmutableMap.<String, String>builder().put("HADOOP_USER_NAME", 
"datastrato").build());
+        ImmutableMap.<String, String>builder().put("HADOOP_USER_NAME", 
"anonymous").build());
   }
 
   /**
@@ -158,7 +158,7 @@ public class ContainerSuite implements Closeable {
               TrinoContainer.builder()
                   .withEnvVars(
                       ImmutableMap.<String, String>builder()
-                          .put("HADOOP_USER_NAME", "datastrato")
+                          .put("HADOOP_USER_NAME", "anonymous")
                           .put("GRAVITINO_HOST_IP", "host.docker.internal")
                           .put("GRAVITINO_HOST_PORT", 
String.valueOf(gravitinoServerPort))
                           .put("GRAVITINO_METALAKE_NAME", metalakeName)
diff --git a/integration-test/trino-it/docker-compose.yaml 
b/integration-test/trino-it/docker-compose.yaml
index a4568b077..04f9264ed 100644
--- a/integration-test/trino-it/docker-compose.yaml
+++ b/integration-test/trino-it/docker-compose.yaml
@@ -26,7 +26,7 @@ services:
       - trino-net
     container_name: trino-ci-hive
     environment:
-      - HADOOP_USER_NAME=root
+      - HADOOP_USER_NAME=anonymous
     entrypoint:  /bin/bash /tmp/hive/init.sh
     volumes:
       - ./init/hive:/tmp/hive
@@ -86,7 +86,7 @@ services:
       - trino-net
     container_name: trino-ci-trino
     environment:
-      - HADOOP_USER_NAME=root
+      - HADOOP_USER_NAME=anonymous
       - GRAVITINO_HOST_IP=host.docker.internal
       - GRAVITINO_HOST_PORT=${GRAVITINO_SERVER_PORT:-8090}
       - GRAVITINO_METALAKE_NAME=test
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index fa1166511..a67fcd218 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -192,7 +192,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
     Assertions.assertFalse(databaseMeta.containsKey("Comment"));
     Assertions.assertTrue(databaseMeta.containsKey("Location"));
-    Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
+    Assertions.assertEquals("anonymous", databaseMeta.get("Owner"));
     String properties = databaseMeta.get("Properties");
     Assertions.assertTrue(properties.contains("(ID,001)"));
 
@@ -206,7 +206,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     databaseMeta = getDatabaseMetadata(testDatabaseName);
     String comment = databaseMeta.get("Comment");
     Assertions.assertEquals("comment", comment);
-    Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
+    Assertions.assertEquals("anonymous", databaseMeta.get("Owner"));
     // underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
     
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
     properties = databaseMeta.get("Properties");

Reply via email to