This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f14cb1633d [#8177] fix(iceberg-common): Fix the impersonation problem 
in creating iceberg table (#8189)
f14cb1633d is described below

commit f14cb1633d1548a96b9ff52bf0558f5667823027
Author: Mini Yu <[email protected]>
AuthorDate: Fri Dec 5 09:27:51 2025 +0800

    [#8177] fix(iceberg-common): Fix the impersonation problem in creating 
iceberg table (#8189)
    
    ### What changes were proposed in this pull request?
    
    Handle impersonation problem for Iceberg table using Hive backend.
    
    ### Why are the changes needed?
    
    Impersonation does not work for iceberg catalog with Hive backend
    
    Fix: #8177
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    IT
---
 .../catalog-lakehouse-iceberg/build.gradle.kts     |   6 +-
 .../iceberg/IcebergCatalogOperations.java          |  11 +-
 .../test/CatalogIcebergKerberosHiveIT.java         |  27 +++
 gradle/libs.versions.toml                          |   6 +-
 iceberg/iceberg-common/build.gradle.kts            |   6 +-
 .../iceberg/common/ClosableHiveCatalog.java        | 194 ++++++++++++++++++++-
 .../common/authentication/SupportsKerberos.java    |  42 +++++
 .../authentication/kerberos/HiveBackendProxy.java  | 187 --------------------
 .../authentication/kerberos/KerberosClient.java    |   6 +-
 .../iceberg/common/ops/IcebergCatalogWrapper.java  |   2 +
 .../ops/KerberosAwareIcebergCatalogProxy.java      |  81 +++++++++
 .../iceberg/common/utils/IcebergCatalogUtil.java   |  32 +---
 iceberg/iceberg-rest-server/build.gradle.kts       |   9 +
 .../service/IcebergCatalogWrapperManager.java      |  13 +-
 .../integration/test/IcebergRESTServiceBaseIT.java |   6 +-
 .../test/IcebergRestKerberosHiveCatalogIT.java     |  10 +-
 ...KerberosHiveWithUserImpersonationCatalogIT.java | 133 ++++++++++++++
 17 files changed, 536 insertions(+), 235 deletions(-)

diff --git a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts 
b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
index 4a143321f9..315b86c7e2 100644
--- a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
@@ -45,7 +45,11 @@ dependencies {
   implementation(libs.bundles.iceberg)
 
   implementation(libs.bundles.log4j)
-  implementation(libs.cglib)
+  implementation(libs.cglib) {
+    // The version of build-in asm is 7.1, which is not compatible with Java 
17 well
+    exclude("org.ow2.asm")
+  }
+  implementation(libs.asm)
   implementation(libs.commons.collections4)
   implementation(libs.commons.io)
   implementation(libs.commons.lang3)
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
index 043905c7da..492a69bd29 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
@@ -50,8 +50,11 @@ import 
org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
 import 
org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper.IcebergTableChange;
+import 
org.apache.gravitino.iceberg.common.ops.KerberosAwareIcebergCatalogProxy;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
@@ -113,7 +116,13 @@ public class IcebergCatalogOperations implements 
CatalogOperations, SupportsSche
     resultConf.put("catalog_uuid", info.id().toString());
     IcebergConfig icebergConfig = new IcebergConfig(resultConf);
 
-    this.icebergCatalogWrapper = new IcebergCatalogWrapper(icebergConfig);
+    IcebergCatalogWrapper rawWrapper = new 
IcebergCatalogWrapper(icebergConfig);
+
+    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(resultConf);
+    this.icebergCatalogWrapper =
+        authenticationConfig.isKerberosAuth() && rawWrapper.getCatalog() 
instanceof SupportsKerberos
+            ? new 
KerberosAwareIcebergCatalogProxy(rawWrapper).getProxy(icebergConfig)
+            : rawWrapper;
     this.icebergCatalogWrapperHelper =
         new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
   }
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
index 15493d9c47..65cf8c6bd6 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
@@ -262,6 +262,33 @@ public class CatalogIcebergKerberosHiveIT extends BaseIT {
     Assertions.assertDoesNotThrow(
         () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", 
ImmutableMap.of()));
 
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chmod", "-R", "000", 
"/user/hive/warehouse-catalog-iceberg");
+
+    // Create a table, and the user has no EXECUTE permission to create table
+    NameIdentifier tableNameIdentifier1 = NameIdentifier.of(SCHEMA_NAME, 
TABLE_NAME);
+    exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        tableNameIdentifier1,
+                        createColumns(),
+                        "",
+                        ImmutableMap.of(),
+                        Transforms.EMPTY_TRANSFORM,
+                        Distributions.NONE,
+                        SortOrders.NONE));
+    exceptionMessage = Throwables.getStackTraceAsString(exception);
+    // Make sure the real user is 'gravitino_client'
+    Assertions.assertTrue(
+        exceptionMessage.contains("Permission denied: user=gravitino_client, 
access=EXECUTE"));
+
+    // Now try to permit the user to create the table again
+    kerberosHiveContainer.executeInContainer(
+        "hadoop", "fs", "-chmod", "-R", "777", 
"/user/hive/warehouse-catalog-iceberg");
     // Create table
     NameIdentifier tableNameIdentifier = NameIdentifier.of(SCHEMA_NAME, 
TABLE_NAME);
     catalog
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index f0fd23e4e5..6384a36bc4 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -92,7 +92,10 @@ servlet = "3.1.0"
 jodd = "3.5.2"
 flink = "1.18.0"
 flinkjdbc = "3.2.0-1.18"
-cglib = "2.2"
+# cglib upgraded from 2. to 3.3.0 for Java 17 compatibility.
+# Explicit ASM dependency (9.8) is required due to cglib's internal use and 
Java 17 module system.
+cglib = "3.3.0"
+asm = "9.8"
 ranger = "2.4.0"
 javax-jaxb-api = "2.3.1"
 javax-ws-rs-api = "2.1.1"
@@ -276,6 +279,7 @@ kafka-clients = { group = "org.apache.kafka", name = 
"kafka-clients", version.re
 kafka = { group = "org.apache.kafka", name = "kafka_2.12", version.ref = 
"kafka" }
 curator-test = { group = "org.apache.curator", name = "curator-test", 
version.ref = "curator"}
 cglib = { group = "cglib", name = "cglib", version.ref = "cglib"}
+asm = { group = "org.ow2.asm", name = "asm", version.ref = "asm"}
 woodstox-core = { group = "com.fasterxml.woodstox", name = "woodstox-core", 
version.ref = "woodstox-core"}
 thrift = { group = "org.apache.thrift", name = "libthrift", version.ref = 
"thrift"}
 derby = { group = "org.apache.derby", name = "derby", version.ref = "derby"}
diff --git a/iceberg/iceberg-common/build.gradle.kts 
b/iceberg/iceberg-common/build.gradle.kts
index f9dc336976..50c052d59c 100644
--- a/iceberg/iceberg-common/build.gradle.kts
+++ b/iceberg/iceberg-common/build.gradle.kts
@@ -39,7 +39,11 @@ dependencies {
     exclude("org.jline")
   }
   implementation(libs.caffeine)
-  implementation(libs.cglib)
+  implementation(libs.cglib) {
+    // The version of build-in asm is 7.1, which is not compatible with Java 
17 well
+    exclude("org.ow2.asm")
+  }
+  implementation(libs.asm)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.iceberg.aliyun)
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
index 88db666d83..9b708acc68 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
@@ -19,13 +19,31 @@
 
 package org.apache.gravitino.iceberg.common;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Getter;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
+import 
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosClient;
+import 
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
 import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.iceberg.ClientPool;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,11 +51,13 @@ import org.slf4j.LoggerFactory;
  * ClosableHiveCatalog is a wrapper class to wrap Iceberg HiveCatalog to do 
some clean-up work like
  * closing resources.
  */
-public class ClosableHiveCatalog extends HiveCatalog implements Closeable {
+public class ClosableHiveCatalog extends HiveCatalog implements Closeable, 
SupportsKerberos {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClosableHiveCatalog.class);
 
-  private final List<Closeable> resources = Lists.newArrayList();
+  @Getter private final List<Closeable> resources = Lists.newArrayList();
+
+  private KerberosClient kerberosClient;
 
   public ClosableHiveCatalog() {
     super();
@@ -47,8 +67,40 @@ public class ClosableHiveCatalog extends HiveCatalog 
implements Closeable {
     resources.add(resource);
   }
 
+  /**
+   * Initialize the ClosableHiveCatalog with the given input name and 
properties.
+   *
+   * <p>Note: This method can only be called once as it will create new client 
pools.
+   *
+   * @param inputName name of the catalog
+   * @param properties properties for the catalog
+   */
+  @Override
+  public void initialize(String inputName, Map<String, String> properties) {
+    super.initialize(inputName, properties);
+
+    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties);
+    if (authenticationConfig.isKerberosAuth()) {
+      this.kerberosClient = initKerberosClient();
+    }
+
+    try {
+      resetIcebergHiveClientPool();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
+    }
+  }
+
   @Override
   public void close() throws IOException {
+    if (kerberosClient != null) {
+      try {
+        kerberosClient.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close KerberosClient", e);
+      }
+    }
+
     // Do clean up work here. We need a mechanism to close the HiveCatalog; 
however, HiveCatalog
     // doesn't implement the Closeable interface.
 
@@ -68,6 +120,131 @@ public class ClosableHiveCatalog extends HiveCatalog 
implements Closeable {
         });
   }
 
+  @Override
+  public <R> R doKerberosOperations(Executable<R> executable) throws Throwable 
{
+    Map<String, String> properties = this.properties();
+    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties);
+
+    final String finalPrincipalName;
+    String proxyKerberosPrincipalName = 
PrincipalUtils.getCurrentPrincipal().getName();
+
+    if (!proxyKerberosPrincipalName.contains("@")) {
+      finalPrincipalName =
+          String.format("%s@%s", proxyKerberosPrincipalName, 
kerberosClient.getRealm());
+    } else {
+      finalPrincipalName = proxyKerberosPrincipalName;
+    }
+
+    UserGroupInformation realUser =
+        authenticationConfig.isImpersonationEnabled()
+            ? UserGroupInformation.createProxyUser(
+                finalPrincipalName, kerberosClient.getLoginUser())
+            : kerberosClient.getLoginUser();
+    try {
+      ClientPool<IMetaStoreClient, TException> newClientPool =
+          (ClientPool<IMetaStoreClient, TException>) 
FieldUtils.readField(this, "clients", true);
+      kerberosClient
+          .getLoginUser()
+          .doAs(
+              (PrivilegedExceptionAction<Void>)
+                  () -> {
+                    String token =
+                        newClientPool.run(
+                            client ->
+                                client.getDelegationToken(
+                                    finalPrincipalName,
+                                    
kerberosClient.getLoginUser().getShortUserName()));
+
+                    Token<DelegationTokenIdentifier> delegationToken = new 
Token<>();
+                    delegationToken.decodeFromUrlString(token);
+                    realUser.addToken(delegationToken);
+                    return null;
+                  });
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to get delegation token for principal: " + 
finalPrincipalName, e);
+    }
+    return realUser.doAs(
+        (PrivilegedExceptionAction<R>)
+            () -> {
+              try {
+                return executable.execute();
+              } catch (Throwable e) {
+                if (RuntimeException.class.isAssignableFrom(e.getClass())) {
+                  throw (RuntimeException) e;
+                }
+                throw new RuntimeException("Failed to invoke method", e);
+              }
+            });
+  }
+
+  private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
+      throws IllegalAccessException {
+    // Get the old client pool before replacing it
+    Object oldPool = FieldUtils.readField(this, "clients", true);
+
+    // Create and set the new client pool first
+    IcebergHiveCachedClientPool newClientPool =
+        new IcebergHiveCachedClientPool(this.getConf(), this.properties());
+    FieldUtils.writeField(this, "clients", newClientPool, true);
+
+    // Then try to close the old pool to release resources
+    if (oldPool != null) {
+      // Try standard close method if available
+      if (oldPool instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) oldPool).close();
+          LOGGER.info("Successfully closed old Hive client pool");
+        } catch (Exception e) {
+          LOGGER.warn("Failed to close old Hive client pool", e);
+        }
+      }
+
+      // Additionally, try to shutdown the internal scheduler thread pool in 
Iceberg's
+      // CachedClientPool to prevent memory leak. This is necessary because 
Iceberg's
+      // CachedClientPool does not implement Closeable and does not properly 
clean up
+      // its internal scheduler.
+      try {
+        shutdownIcebergCachedClientPoolScheduler(oldPool);
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Failed to shutdown scheduler in old CachedClientPool, may cause 
minor resource leak",
+            e);
+      }
+    }
+
+    return newClientPool;
+  }
+
+  /**
+   * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
+   *
+   * <p>Required because CachedClientPool doesn't provide cleanup, causing 
thread pool leaks.
+   *
+   * @param clientPool The old CachedClientPool instance
+   */
+  @VisibleForTesting
+  void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
+    try {
+      Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
+      if (cache == null) {
+        LOGGER.debug("clientPoolCache is null, no scheduler to shutdown");
+        return;
+      }
+
+      ScheduledExecutorService executor =
+          CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+      if (executor != null) {
+        LOGGER.info("Shutting down scheduler thread pool from old 
CachedClientPool");
+        executor.shutdownNow();
+      } else {
+        LOGGER.debug("Could not extract scheduler executor from cache");
+      }
+    } catch (IllegalAccessException e) {
+      LOGGER.debug("Failed to access clientPoolCache field", e);
+    }
+  }
+
   /**
    * Close the internal HiveCatalog client pool using reflection. This is 
necessary because
    * HiveCatalog doesn't provide a public API to close its client pool. We 
need to avoid closing
@@ -93,4 +270,17 @@ public class ClosableHiveCatalog extends HiveCatalog 
implements Closeable {
       LOGGER.warn("Failed to close HiveCatalog internal client pool", e);
     }
   }
+
+  private KerberosClient initKerberosClient() {
+    try {
+      KerberosClient kerberosClient = new KerberosClient(this.properties(), 
this.getConf());
+      // catalog_uuid always exists for Gravitino managed catalogs, `0` is 
just a fallback value.
+      String catalogUUID = properties().getOrDefault("catalog_uuid", "0");
+      File keytabFile = 
kerberosClient.saveKeyTabFileFromUri(Long.parseLong(catalogUUID));
+      kerberosClient.login(keytabFile.getAbsolutePath());
+      return kerberosClient;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to login with kerberos", e);
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
new file mode 100644
index 0000000000..78d070e39a
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.common.authentication;
+
+/**
+ * An interface to indicate that the implementing class supports Kerberos 
authentication and can do
+ * operations with Kerberos.
+ */
+public interface SupportsKerberos {
+
+  /**
+   * Perform operations with Kerberos authentication.
+   *
+   * @param executable the operations to be performed
+   * @return the result of the operations
+   * @param <R> the return type of the operations
+   * @throws Throwable if any error occurs during the operations
+   */
+  <R> R doKerberosOperations(Executable<R> executable) throws Throwable;
+
+  @FunctionalInterface
+  interface Executable<R> {
+    R execute() throws Throwable;
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
deleted file mode 100644
index 2da071f08f..0000000000
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.iceberg.common.authentication.kerberos;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import net.sf.cglib.proxy.Enhancer;
-import net.sf.cglib.proxy.MethodInterceptor;
-import net.sf.cglib.proxy.MethodProxy;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import 
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
-import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
-import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.iceberg.ClientPool;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Proxy class for HiveCatalog to support kerberos authentication. We can also 
make HiveCatalog as a
- * generic type and pass it as a parameter to the constructor.
- */
-public class HiveBackendProxy implements MethodInterceptor {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HiveBackendProxy.class);
-  private final HiveCatalog target;
-  private final String kerberosRealm;
-  private final UserGroupInformation proxyUser;
-  private final Map<String, String> properties;
-  private final ClientPool<IMetaStoreClient, TException> newClientPool;
-
-  public HiveBackendProxy(
-      Map<String, String> properties, HiveCatalog target, String 
kerberosRealm) {
-    this.target = target;
-    this.properties = properties;
-    this.kerberosRealm = kerberosRealm;
-    try {
-      proxyUser = UserGroupInformation.getCurrentUser();
-
-      // Replace the original client pool with IcebergHiveCachedClientPool. 
Why do we need to do
-      // this? Because the original client pool in Iceberg uses a fixed 
username to create the
-      // client pool, and it will not work with kerberos authentication. We 
need to create a new
-      // client pool with the current user. For more, please see 
CachedClientPool#clientPool and
-      // notice the value of `key`
-      this.newClientPool = resetIcebergHiveClientPool();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to get current user", e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
-    }
-  }
-
-  @Override
-  public Object intercept(Object o, Method method, Object[] objects, 
MethodProxy methodProxy)
-      throws Throwable {
-    final String finalPrincipalName;
-    String proxyKerberosPrincipalName = 
PrincipalUtils.getCurrentPrincipal().getName();
-    if (!proxyKerberosPrincipalName.contains("@")) {
-      finalPrincipalName = String.format("%s@%s", proxyKerberosPrincipalName, 
kerberosRealm);
-    } else {
-      finalPrincipalName = proxyKerberosPrincipalName;
-    }
-    UserGroupInformation realUser =
-        UserGroupInformation.createProxyUser(finalPrincipalName, proxyUser);
-
-    String token =
-        newClientPool.run(
-            client -> client.getDelegationToken(finalPrincipalName, 
proxyUser.getShortUserName()));
-
-    Token<DelegationTokenIdentifier> delegationToken = new Token<>();
-    delegationToken.decodeFromUrlString(token);
-    realUser.addToken(delegationToken);
-
-    return realUser.doAs(
-        (PrivilegedExceptionAction<Object>)
-            () -> {
-              try {
-                return methodProxy.invoke(target, objects);
-              } catch (Throwable e) {
-                if (RuntimeException.class.isAssignableFrom(e.getClass())) {
-                  throw (RuntimeException) e;
-                }
-                throw new RuntimeException("Failed to invoke method", e);
-              }
-            });
-  }
-
-  private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
-      throws IllegalAccessException {
-    // Get the old client pool before replacing it
-    Object oldPool = FieldUtils.readField(target, "clients", true);
-
-    // Create and set the new client pool first
-    IcebergHiveCachedClientPool newClientPool =
-        new IcebergHiveCachedClientPool(target.getConf(), properties);
-    FieldUtils.writeField(target, "clients", newClientPool, true);
-
-    // Then try to close the old pool to release resources
-    if (oldPool != null) {
-      // Try standard close method if available
-      if (oldPool instanceof AutoCloseable) {
-        try {
-          ((AutoCloseable) oldPool).close();
-          LOG.info("Successfully closed old Hive client pool");
-        } catch (Exception e) {
-          LOG.warn("Failed to close old Hive client pool", e);
-        }
-      }
-
-      // Additionally, try to shutdown the internal scheduler thread pool in 
Iceberg's
-      // CachedClientPool to prevent memory leak. This is necessary because 
Iceberg's
-      // CachedClientPool does not implement Closeable and does not properly 
clean up
-      // its internal scheduler.
-      try {
-        shutdownIcebergCachedClientPoolScheduler(oldPool);
-      } catch (Exception e) {
-        LOG.warn(
-            "Failed to shutdown scheduler in old CachedClientPool, may cause 
minor resource leak",
-            e);
-      }
-    }
-
-    return newClientPool;
-  }
-
-  /**
-   * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
-   *
-   * <p>Required because CachedClientPool doesn't provide cleanup, causing 
thread pool leaks.
-   *
-   * @param clientPool The old CachedClientPool instance
-   */
-  @VisibleForTesting
-  void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
-    try {
-      Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
-      if (cache == null) {
-        LOG.debug("clientPoolCache is null, no scheduler to shutdown");
-        return;
-      }
-
-      ScheduledExecutorService executor =
-          CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
-      if (executor != null) {
-        LOG.info("Shutting down scheduler thread pool from old 
CachedClientPool");
-        executor.shutdownNow();
-      } else {
-        LOG.debug("Could not extract scheduler executor from cache");
-      }
-    } catch (IllegalAccessException e) {
-      LOG.debug("Failed to access clientPoolCache field", e);
-    }
-  }
-
-  public HiveCatalog getProxy() {
-    Enhancer e = new Enhancer();
-    e.setClassLoader(target.getClass().getClassLoader());
-    e.setSuperclass(target.getClass());
-    e.setCallback(this);
-    return (HiveCatalog) e.create();
-  }
-}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
index a7d1f9d079..12b612fa82 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -40,6 +41,7 @@ public class KerberosClient implements Closeable {
   private final ScheduledThreadPoolExecutor checkTgtExecutor;
   private final Map<String, String> conf;
   private final Configuration hadoopConf;
+  @Getter private UserGroupInformation loginUser;
   private String realm;
 
   public KerberosClient(Map<String, String> conf, Configuration hadoopConf) {
@@ -73,14 +75,14 @@ public class KerberosClient implements Closeable {
     // Login
     UserGroupInformation.setConfiguration(hadoopConf);
     UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
-    UserGroupInformation kerberosLoginUgi = 
UserGroupInformation.getCurrentUser();
+    loginUser = UserGroupInformation.getCurrentUser();
 
     // Refresh the cache if it's out of date.
     int checkInterval = kerberosConfig.getCheckIntervalSec();
     checkTgtExecutor.scheduleAtFixedRate(
         () -> {
           try {
-            kerberosLoginUgi.checkTGTAndReloginFromKeytab();
+            loginUser.checkTGTAndReloginFromKeytab();
           } catch (Exception e) {
             LOG.error("Fail to refresh ugi token: ", e);
           }
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
index bc9f7c0c09..54513e9a15 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
@@ -73,11 +73,13 @@ public class IcebergCatalogWrapper implements AutoCloseable 
{
   @Getter protected Catalog catalog;
   private SupportsNamespaces asNamespaceCatalog;
   private final IcebergCatalogBackend catalogBackend;
+  @Getter private final IcebergConfig icebergConfig;
   private String catalogUri = null;
   private Map<String, String> catalogPropertiesMap;
   private TableMetadataCache metadataCache;
 
   public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
+    this.icebergConfig = icebergConfig;
     this.catalogBackend =
         IcebergCatalogBackend.valueOf(
             
icebergConfig.get(IcebergConfig.CATALOG_BACKEND).toUpperCase(Locale.ROOT));
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
new file mode 100644
index 0000000000..c749c1af28
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
@@ -0,0 +1,81 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.iceberg.common.ops;
+
+import java.lang.reflect.Method;
+import net.sf.cglib.proxy.Enhancer;
+import net.sf.cglib.proxy.MethodInterceptor;
+import net.sf.cglib.proxy.MethodProxy;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
+import org.apache.iceberg.catalog.Catalog;
+
+/**
+ * A proxy class for IcebergCatalogWrapper to handle Kerberos authentication 
or other cross-cutting
+ * concerns.
+ */
+public class KerberosAwareIcebergCatalogProxy implements MethodInterceptor {
+  private final IcebergCatalogWrapper target;
+  private final Catalog catalog;
+
+  public KerberosAwareIcebergCatalogProxy(IcebergCatalogWrapper target) {
+    this.target = target;
+    this.catalog = target.getCatalog();
+  }
+
+  @Override
+  public Object intercept(Object o, Method method, Object[] objects, 
MethodProxy methodProxy)
+      throws Throwable {
+    if (catalog instanceof SupportsKerberos) {
+      SupportsKerberos kerberosCatalog = (SupportsKerberos) catalog;
+      return kerberosCatalog.doKerberosOperations(() -> 
methodProxy.invoke(target, objects));
+    }
+
+    return method.invoke(target, objects);
+  }
+
+  public IcebergCatalogWrapper getProxy(IcebergConfig config) {
+    Enhancer e = new Enhancer();
+    e.setClassLoader(target.getClass().getClassLoader());
+    e.setSuperclass(target.getClass());
+    e.setCallback(this);
+
+    Class<?>[] argClass = new Class[] {IcebergConfig.class};
+    return (IcebergCatalogWrapper) e.create(argClass, new Object[] {config});
+  }
+
+  /**
+   * Create a proxy instance with catalogName and config constructor. It's 
used for class
+   * CatalogWrapperForREST or its subclass.
+   *
+   * @param catalogName Name of the catalog.
+   * @param config Iceberg configuration.
+   * @return The proxy instance.
+   */
+  public IcebergCatalogWrapper getProxy(String catalogName, IcebergConfig 
config) {
+    Enhancer e = new Enhancer();
+    e.setClassLoader(target.getClass().getClassLoader());
+    e.setSuperclass(target.getClass());
+    e.setCallback(this);
+
+    Class<?>[] argClass = new Class[] {String.class, IcebergConfig.class};
+    return (IcebergCatalogWrapper) e.create(argClass, new Object[] 
{catalogName, config});
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
index 3ed1ad3ec0..38f2643353 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
@@ -22,8 +22,6 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,9 +33,6 @@ import 
org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.iceberg.common.ClosableHiveCatalog;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
-import 
org.apache.gravitino.iceberg.common.authentication.kerberos.HiveBackendProxy;
-import 
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosClient;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -86,16 +81,7 @@ public class IcebergCatalogUtil {
       hdfsConfiguration.set(HADOOP_SECURITY_AUTHORIZATION, "true");
       hdfsConfiguration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
       hiveCatalog.setConf(hdfsConfiguration);
-      hiveCatalog.initialize(icebergCatalogName, properties);
-
-      KerberosClient kerberosClient = initKerberosAndReturnClient(properties, 
hdfsConfiguration);
-      hiveCatalog.addResource(kerberosClient);
-      if (authenticationConfig.isImpersonationEnabled()) {
-        HiveBackendProxy proxyHiveCatalog =
-            new HiveBackendProxy(resultProperties, hiveCatalog, 
kerberosClient.getRealm());
-        return proxyHiveCatalog.getProxy();
-      }
-
+      hiveCatalog.initialize(icebergCatalogName, resultProperties);
       return hiveCatalog;
     } else {
       throw new UnsupportedOperationException(
@@ -103,22 +89,6 @@ public class IcebergCatalogUtil {
     }
   }
 
-  private static KerberosClient initKerberosAndReturnClient(
-      Map<String, String> properties, Configuration conf) {
-    try {
-      KerberosClient kerberosClient = new KerberosClient(properties, conf);
-
-      // For Iceberg rest server, we haven't set the catalog_uuid, so we set 
it to 0 as there is
-      // only one catalog in the rest server, so it's okay to set it to 0.
-      String catalogUUID = properties.getOrDefault("catalog_uuid", "0");
-      File keytabFile = 
kerberosClient.saveKeyTabFileFromUri(Long.valueOf(catalogUUID));
-      kerberosClient.login(keytabFile.getAbsolutePath());
-      return kerberosClient;
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to login with kerberos", e);
-    }
-  }
-
   @SuppressWarnings("FormatStringAnnotation")
   private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) {
     String driverClassName = icebergConfig.getJdbcDriver();
diff --git a/iceberg/iceberg-rest-server/build.gradle.kts 
b/iceberg/iceberg-rest-server/build.gradle.kts
index 25db4d9db8..e74945ff3e 100644
--- a/iceberg/iceberg-rest-server/build.gradle.kts
+++ b/iceberg/iceberg-rest-server/build.gradle.kts
@@ -56,6 +56,10 @@ dependencies {
   implementation(libs.bundles.metrics)
   implementation(libs.bundles.prometheus)
   implementation(libs.caffeine)
+  implementation(libs.cglib) {
+    // The version of build-in asm is 7.1, which is not compatible with Java 
17 well
+    exclude("org.ow2.asm")
+  }
   implementation(libs.commons.dbcp2)
   implementation(libs.commons.lang3)
   implementation(libs.concurrent.trees)
@@ -108,6 +112,11 @@ dependencies {
   testImplementation(libs.sqlite.jdbc)
   testImplementation(libs.slf4j.api)
   testImplementation(libs.testcontainers)
+  testImplementation(libs.cglib) {
+    // The version of build-in asm is 7.1, which is not compatible with Java 
17 well
+    exclude("org.ow2.asm")
+  }
+  testImplementation(libs.asm)
   testImplementation(libs.testcontainers.postgresql)
 
   // Add Hadoop 3.3+ dependencies since Spark's Hadoop is excluded
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index d827e601ed..58733d7847 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -29,7 +29,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import 
org.apache.gravitino.iceberg.common.ops.KerberosAwareIcebergCatalogProxy;
 import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,7 +103,15 @@ public class IcebergCatalogWrapperManager implements 
AutoCloseable {
   @VisibleForTesting
   protected CatalogWrapperForREST createCatalogWrapper(
       String catalogName, IcebergConfig icebergConfig) {
-    return new CatalogWrapperForREST(catalogName, icebergConfig);
+    CatalogWrapperForREST rest = new CatalogWrapperForREST(catalogName, 
icebergConfig);
+    AuthenticationConfig authenticationConfig =
+        new AuthenticationConfig(icebergConfig.getAllConfig());
+    if (rest.getCatalog() instanceof SupportsKerberos && 
authenticationConfig.isKerberosAuth()) {
+      return (CatalogWrapperForREST)
+          new KerberosAwareIcebergCatalogProxy(rest).getProxy(catalogName, 
icebergConfig);
+    }
+
+    return rest;
   }
 
   private void closeIcebergCatalogWrapper(IcebergCatalogWrapper 
catalogWrapper) {
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
index 1cd1d0f63c..ab32e165d6 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public abstract class IcebergRESTServiceBaseIT {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class);
-  private SparkSession sparkSession;
+  protected SparkSession sparkSession;
   protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY;
   private IcebergRESTServerManager icebergRESTServerManager;
 
@@ -125,14 +125,14 @@ public abstract class IcebergRESTServiceBaseIT {
     LOG.info("Iceberg REST service config registered, {}", 
StringUtils.join(icebergConfigs));
   }
 
-  private int getServerPort() {
+  protected int getServerPort() {
     JettyServerConfig jettyServerConfig =
         JettyServerConfig.fromConfig(
             icebergRESTServerManager.getServerConfig(), 
IcebergConfig.ICEBERG_CONFIG_PREFIX);
     return jettyServerConfig.getHttpPort();
   }
 
-  private void initSparkEnv() {
+  protected void initSparkEnv() {
     int port = getServerPort();
     LOG.info("Iceberg REST server port:{}", port);
     String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/";, 
port);
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
index 90d6907d67..36969f0e31 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
@@ -41,10 +41,10 @@ import org.junit.jupiter.api.condition.EnabledIf;
 @EnabledIf("isEmbedded")
 public class IcebergRestKerberosHiveCatalogIT extends IcebergRESTHiveCatalogIT 
{
 
-  private static final String HIVE_METASTORE_CLIENT_PRINCIPAL = 
"cli@HADOOPKRB";
-  private static final String HIVE_METASTORE_CLIENT_KEYTAB = "/client.keytab";
+  protected static final String HIVE_METASTORE_CLIENT_PRINCIPAL = 
"cli@HADOOPKRB";
+  protected static final String HIVE_METASTORE_CLIENT_KEYTAB = 
"/client.keytab";
 
-  private static String tempDir;
+  protected static String tempDir;
 
   public IcebergRestKerberosHiveCatalogIT() {
     super();
@@ -150,7 +150,7 @@ public class IcebergRestKerberosHiveCatalogIT extends 
IcebergRESTHiveCatalogIT {
     return configMap;
   }
 
-  private static boolean isEmbedded() {
+  protected static boolean isEmbedded() {
     String mode =
         System.getProperty(ITUtils.TEST_MODE) == null
             ? ITUtils.EMBEDDED_TEST_MODE
@@ -159,7 +159,7 @@ public class IcebergRestKerberosHiveCatalogIT extends 
IcebergRESTHiveCatalogIT {
     return Objects.equals(mode, ITUtils.EMBEDDED_TEST_MODE);
   }
 
-  private static void refreshKerberosConfig() {
+  protected static void refreshKerberosConfig() {
     Class<?> classRef;
     try {
       if (System.getProperty("java.vendor").contains("IBM")) {
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
new file mode 100644
index 0000000000..28a7ee4dcc
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.integration.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+@Tag("gravitino-docker-test")
+@TestInstance(Lifecycle.PER_CLASS)
+@EnabledIf("isEmbedded")
+public class IcebergRestKerberosHiveWithUserImpersonationCatalogIT
+    extends IcebergRestKerberosHiveCatalogIT {
+
+  private static final String NORMAL_USER = "normal";
+
+  public IcebergRestKerberosHiveWithUserImpersonationCatalogIT() {
+    super();
+  }
+
+  @BeforeAll
+  void prepareSQLContext() {
+
+    // Change the ownership of /user/hive to normal user for user 
impersonation test. If we do not
+    // change the ownership, the normal user will not have the permission to 
create table in Hive
+    // as the /user/hive is owned by user `cli`, please see what's done in 
`initEnv` method in
+    // superclass.
+    containerSuite
+        .getKerberosHiveContainer()
+        .executeInContainer("hadoop", "fs", "-chown", "-R", NORMAL_USER, 
"/user/hive/");
+
+    super.prepareSQLContext();
+  }
+
+  @Override
+  Map<String, String> getCatalogConfig() {
+    Map<String, String> superConfig = super.getCatalogConfig();
+    Map<String, String> configMap = new HashMap<>(superConfig);
+
+    // Enable user impersonation in Iceberg REST server side, so the user 
passed to
+    // HDFS is `normal` instead of `cli`.
+    
configMap.put("gravitino.iceberg-rest.authentication.impersonation-enable", 
"true");
+    return configMap;
+  }
+
+  @Override
+  protected void initSparkEnv() {
+    int port = getServerPort();
+    LOG.info("Iceberg REST server port:{}", port);
+    String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/";, 
port);
+    SparkConf sparkConf =
+        new SparkConf()
+            .set(
+                "spark.sql.extensions",
+                
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+            .set("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog")
+            .set("spark.sql.catalog.rest.type", "rest")
+            .set("spark.sql.catalog.rest.uri", icebergRESTUri)
+
+            // Add basic auth to connect to Iceberg REST server, so the user 
name is `normal` and
+            // the final real user is `normal` via user impersonation.
+            .set("spark.sql.catalog.rest.rest.auth.type", "basic")
+            .set("spark.sql.catalog.rest.rest.auth.basic.username", 
NORMAL_USER)
+            .set("spark.sql.catalog.rest.rest.auth.basic.password", "mock")
+            // drop Iceberg table purge may hang in spark local mode
+            .set("spark.locality.wait.node", "0");
+
+    if (supportsCredentialVending()) {
+      sparkConf.set(
+          "spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation", 
"vended-credentials");
+    }
+
+    sparkSession = 
SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate();
+  }
+
+  protected String getTestNamespace(@Nullable String childNamespace) {
+    String separator;
+    String parentNamespace;
+
+    if (supportsNestedNamespaces()) {
+      parentNamespace = "iceberg_rest.nested.table_test";
+      separator = ".";
+    } else {
+      parentNamespace = "iceberg_rest_with_kerberos_impersonation_table_test";
+      separator = "_";
+    }
+
+    if (childNamespace != null) {
+      return parentNamespace + separator + childNamespace;
+    } else {
+      return parentNamespace;
+    }
+  }
+
+  // Disable the following three tests as they contain data insert operations 
and which are not
+  // controlled by the Gravitino Iceberg REST server currently.
+  @Test
+  @Disabled
+  void testDML() {}
+
+  @Test
+  @Disabled
+  void testRegisterTable() {}
+
+  @Test
+  @Disabled
+  void testSnapshot() {}
+}

Reply via email to