This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f14cb1633d [#8177] fix(iceberg-common): Fix the impersonation problem
in creating iceberg table (#8189)
f14cb1633d is described below
commit f14cb1633d1548a96b9ff52bf0558f5667823027
Author: Mini Yu <[email protected]>
AuthorDate: Fri Dec 5 09:27:51 2025 +0800
[#8177] fix(iceberg-common): Fix the impersonation problem in creating
iceberg table (#8189)
### What changes were proposed in this pull request?
Handle impersonation problem for Iceberg table using Hive backend.
### Why are the changes needed?
Impersonation does not work for iceberg catalog with Hive backend
Fix: #8177
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
IT
---
.../catalog-lakehouse-iceberg/build.gradle.kts | 6 +-
.../iceberg/IcebergCatalogOperations.java | 11 +-
.../test/CatalogIcebergKerberosHiveIT.java | 27 +++
gradle/libs.versions.toml | 6 +-
iceberg/iceberg-common/build.gradle.kts | 6 +-
.../iceberg/common/ClosableHiveCatalog.java | 194 ++++++++++++++++++++-
.../common/authentication/SupportsKerberos.java | 42 +++++
.../authentication/kerberos/HiveBackendProxy.java | 187 --------------------
.../authentication/kerberos/KerberosClient.java | 6 +-
.../iceberg/common/ops/IcebergCatalogWrapper.java | 2 +
.../ops/KerberosAwareIcebergCatalogProxy.java | 81 +++++++++
.../iceberg/common/utils/IcebergCatalogUtil.java | 32 +---
iceberg/iceberg-rest-server/build.gradle.kts | 9 +
.../service/IcebergCatalogWrapperManager.java | 13 +-
.../integration/test/IcebergRESTServiceBaseIT.java | 6 +-
.../test/IcebergRestKerberosHiveCatalogIT.java | 10 +-
...KerberosHiveWithUserImpersonationCatalogIT.java | 133 ++++++++++++++
17 files changed, 536 insertions(+), 235 deletions(-)
diff --git a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
index 4a143321f9..315b86c7e2 100644
--- a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts
@@ -45,7 +45,11 @@ dependencies {
implementation(libs.bundles.iceberg)
implementation(libs.bundles.log4j)
- implementation(libs.cglib)
+ implementation(libs.cglib) {
+ // The version of build-in asm is 7.1, which is not compatible with Java
17 well
+ exclude("org.ow2.asm")
+ }
+ implementation(libs.asm)
implementation(libs.commons.collections4)
implementation(libs.commons.io)
implementation(libs.commons.lang3)
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
index 043905c7da..492a69bd29 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
@@ -50,8 +50,11 @@ import
org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import
org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper.IcebergTableChange;
+import
org.apache.gravitino.iceberg.common.ops.KerberosAwareIcebergCatalogProxy;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
@@ -113,7 +116,13 @@ public class IcebergCatalogOperations implements
CatalogOperations, SupportsSche
resultConf.put("catalog_uuid", info.id().toString());
IcebergConfig icebergConfig = new IcebergConfig(resultConf);
- this.icebergCatalogWrapper = new IcebergCatalogWrapper(icebergConfig);
+ IcebergCatalogWrapper rawWrapper = new
IcebergCatalogWrapper(icebergConfig);
+
+ AuthenticationConfig authenticationConfig = new
AuthenticationConfig(resultConf);
+ this.icebergCatalogWrapper =
+ authenticationConfig.isKerberosAuth() && rawWrapper.getCatalog()
instanceof SupportsKerberos
+ ? new
KerberosAwareIcebergCatalogProxy(rawWrapper).getProxy(icebergConfig)
+ : rawWrapper;
this.icebergCatalogWrapperHelper =
new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
index 15493d9c47..65cf8c6bd6 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
@@ -262,6 +262,33 @@ public class CatalogIcebergKerberosHiveIT extends BaseIT {
Assertions.assertDoesNotThrow(
() -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
ImmutableMap.of()));
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chmod", "-R", "000",
"/user/hive/warehouse-catalog-iceberg");
+
+ // Create a table, and the user has no EXECUTE permission to create table
+ NameIdentifier tableNameIdentifier1 = NameIdentifier.of(SCHEMA_NAME,
TABLE_NAME);
+ exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableNameIdentifier1,
+ createColumns(),
+ "",
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ SortOrders.NONE));
+ exceptionMessage = Throwables.getStackTraceAsString(exception);
+ // Make sure the real user is 'gravitino_client'
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=gravitino_client,
access=EXECUTE"));
+
+ // Now try to permit the user to create the table again
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chmod", "-R", "777",
"/user/hive/warehouse-catalog-iceberg");
// Create table
NameIdentifier tableNameIdentifier = NameIdentifier.of(SCHEMA_NAME,
TABLE_NAME);
catalog
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index f0fd23e4e5..6384a36bc4 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -92,7 +92,10 @@ servlet = "3.1.0"
jodd = "3.5.2"
flink = "1.18.0"
flinkjdbc = "3.2.0-1.18"
-cglib = "2.2"
+# cglib upgraded from 2. to 3.3.0 for Java 17 compatibility.
+# Explicit ASM dependency (9.8) is required due to cglib's internal use and
Java 17 module system.
+cglib = "3.3.0"
+asm = "9.8"
ranger = "2.4.0"
javax-jaxb-api = "2.3.1"
javax-ws-rs-api = "2.1.1"
@@ -276,6 +279,7 @@ kafka-clients = { group = "org.apache.kafka", name =
"kafka-clients", version.re
kafka = { group = "org.apache.kafka", name = "kafka_2.12", version.ref =
"kafka" }
curator-test = { group = "org.apache.curator", name = "curator-test",
version.ref = "curator"}
cglib = { group = "cglib", name = "cglib", version.ref = "cglib"}
+asm = { group = "org.ow2.asm", name = "asm", version.ref = "asm"}
woodstox-core = { group = "com.fasterxml.woodstox", name = "woodstox-core",
version.ref = "woodstox-core"}
thrift = { group = "org.apache.thrift", name = "libthrift", version.ref =
"thrift"}
derby = { group = "org.apache.derby", name = "derby", version.ref = "derby"}
diff --git a/iceberg/iceberg-common/build.gradle.kts
b/iceberg/iceberg-common/build.gradle.kts
index f9dc336976..50c052d59c 100644
--- a/iceberg/iceberg-common/build.gradle.kts
+++ b/iceberg/iceberg-common/build.gradle.kts
@@ -39,7 +39,11 @@ dependencies {
exclude("org.jline")
}
implementation(libs.caffeine)
- implementation(libs.cglib)
+ implementation(libs.cglib) {
+ // The version of build-in asm is 7.1, which is not compatible with Java
17 well
+ exclude("org.ow2.asm")
+ }
+ implementation(libs.asm)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.iceberg.aliyun)
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
index 88db666d83..9b708acc68 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ClosableHiveCatalog.java
@@ -19,13 +19,31 @@
package org.apache.gravitino.iceberg.common;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Getter;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
+import
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosClient;
+import
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,11 +51,13 @@ import org.slf4j.LoggerFactory;
* ClosableHiveCatalog is a wrapper class to wrap Iceberg HiveCatalog to do
some clean-up work like
* closing resources.
*/
-public class ClosableHiveCatalog extends HiveCatalog implements Closeable {
+public class ClosableHiveCatalog extends HiveCatalog implements Closeable,
SupportsKerberos {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClosableHiveCatalog.class);
- private final List<Closeable> resources = Lists.newArrayList();
+ @Getter private final List<Closeable> resources = Lists.newArrayList();
+
+ private KerberosClient kerberosClient;
public ClosableHiveCatalog() {
super();
@@ -47,8 +67,40 @@ public class ClosableHiveCatalog extends HiveCatalog
implements Closeable {
resources.add(resource);
}
+ /**
+ * Initialize the ClosableHiveCatalog with the given input name and
properties.
+ *
+ * <p>Note: This method can only be called once as it will create new client
pools.
+ *
+ * @param inputName name of the catalog
+ * @param properties properties for the catalog
+ */
+ @Override
+ public void initialize(String inputName, Map<String, String> properties) {
+ super.initialize(inputName, properties);
+
+ AuthenticationConfig authenticationConfig = new
AuthenticationConfig(properties);
+ if (authenticationConfig.isKerberosAuth()) {
+ this.kerberosClient = initKerberosClient();
+ }
+
+ try {
+ resetIcebergHiveClientPool();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
+ }
+ }
+
@Override
public void close() throws IOException {
+ if (kerberosClient != null) {
+ try {
+ kerberosClient.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close KerberosClient", e);
+ }
+ }
+
// Do clean up work here. We need a mechanism to close the HiveCatalog;
however, HiveCatalog
// doesn't implement the Closeable interface.
@@ -68,6 +120,131 @@ public class ClosableHiveCatalog extends HiveCatalog
implements Closeable {
});
}
+ @Override
+ public <R> R doKerberosOperations(Executable<R> executable) throws Throwable
{
+ Map<String, String> properties = this.properties();
+ AuthenticationConfig authenticationConfig = new
AuthenticationConfig(properties);
+
+ final String finalPrincipalName;
+ String proxyKerberosPrincipalName =
PrincipalUtils.getCurrentPrincipal().getName();
+
+ if (!proxyKerberosPrincipalName.contains("@")) {
+ finalPrincipalName =
+ String.format("%s@%s", proxyKerberosPrincipalName,
kerberosClient.getRealm());
+ } else {
+ finalPrincipalName = proxyKerberosPrincipalName;
+ }
+
+ UserGroupInformation realUser =
+ authenticationConfig.isImpersonationEnabled()
+ ? UserGroupInformation.createProxyUser(
+ finalPrincipalName, kerberosClient.getLoginUser())
+ : kerberosClient.getLoginUser();
+ try {
+ ClientPool<IMetaStoreClient, TException> newClientPool =
+ (ClientPool<IMetaStoreClient, TException>)
FieldUtils.readField(this, "clients", true);
+ kerberosClient
+ .getLoginUser()
+ .doAs(
+ (PrivilegedExceptionAction<Void>)
+ () -> {
+ String token =
+ newClientPool.run(
+ client ->
+ client.getDelegationToken(
+ finalPrincipalName,
+
kerberosClient.getLoginUser().getShortUserName()));
+
+ Token<DelegationTokenIdentifier> delegationToken = new
Token<>();
+ delegationToken.decodeFromUrlString(token);
+ realUser.addToken(delegationToken);
+ return null;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to get delegation token for principal: " +
finalPrincipalName, e);
+ }
+ return realUser.doAs(
+ (PrivilegedExceptionAction<R>)
+ () -> {
+ try {
+ return executable.execute();
+ } catch (Throwable e) {
+ if (RuntimeException.class.isAssignableFrom(e.getClass())) {
+ throw (RuntimeException) e;
+ }
+ throw new RuntimeException("Failed to invoke method", e);
+ }
+ });
+ }
+
+ private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
+ throws IllegalAccessException {
+ // Get the old client pool before replacing it
+ Object oldPool = FieldUtils.readField(this, "clients", true);
+
+ // Create and set the new client pool first
+ IcebergHiveCachedClientPool newClientPool =
+ new IcebergHiveCachedClientPool(this.getConf(), this.properties());
+ FieldUtils.writeField(this, "clients", newClientPool, true);
+
+ // Then try to close the old pool to release resources
+ if (oldPool != null) {
+ // Try standard close method if available
+ if (oldPool instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) oldPool).close();
+ LOGGER.info("Successfully closed old Hive client pool");
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close old Hive client pool", e);
+ }
+ }
+
+ // Additionally, try to shutdown the internal scheduler thread pool in
Iceberg's
+ // CachedClientPool to prevent memory leak. This is necessary because
Iceberg's
+ // CachedClientPool does not implement Closeable and does not properly
clean up
+ // its internal scheduler.
+ try {
+ shutdownIcebergCachedClientPoolScheduler(oldPool);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to shutdown scheduler in old CachedClientPool, may cause
minor resource leak",
+ e);
+ }
+ }
+
+ return newClientPool;
+ }
+
+ /**
+ * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
+ *
+ * <p>Required because CachedClientPool doesn't provide cleanup, causing
thread pool leaks.
+ *
+ * @param clientPool The old CachedClientPool instance
+ */
+ @VisibleForTesting
+ void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
+ try {
+ Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
+ if (cache == null) {
+ LOGGER.debug("clientPoolCache is null, no scheduler to shutdown");
+ return;
+ }
+
+ ScheduledExecutorService executor =
+ CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
+ if (executor != null) {
+ LOGGER.info("Shutting down scheduler thread pool from old
CachedClientPool");
+ executor.shutdownNow();
+ } else {
+ LOGGER.debug("Could not extract scheduler executor from cache");
+ }
+ } catch (IllegalAccessException e) {
+ LOGGER.debug("Failed to access clientPoolCache field", e);
+ }
+ }
+
/**
* Close the internal HiveCatalog client pool using reflection. This is
necessary because
* HiveCatalog doesn't provide a public API to close its client pool. We
need to avoid closing
@@ -93,4 +270,17 @@ public class ClosableHiveCatalog extends HiveCatalog
implements Closeable {
LOGGER.warn("Failed to close HiveCatalog internal client pool", e);
}
}
+
+ private KerberosClient initKerberosClient() {
+ try {
+ KerberosClient kerberosClient = new KerberosClient(this.properties(),
this.getConf());
+ // catalog_uuid always exists for Gravitino managed catalogs, `0` is
just a fallback value.
+ String catalogUUID = properties().getOrDefault("catalog_uuid", "0");
+ File keytabFile =
kerberosClient.saveKeyTabFileFromUri(Long.parseLong(catalogUUID));
+ kerberosClient.login(keytabFile.getAbsolutePath());
+ return kerberosClient;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to login with kerberos", e);
+ }
+ }
}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
new file mode 100644
index 0000000000..78d070e39a
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/SupportsKerberos.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.common.authentication;
+
+/**
+ * An interface to indicate that the implementing class supports Kerberos
authentication and can do
+ * operations with Kerberos.
+ */
+public interface SupportsKerberos {
+
+ /**
+ * Perform operations with Kerberos authentication.
+ *
+ * @param executable the operations to be performed
+ * @return the result of the operations
+ * @param <R> the return type of the operations
+ * @throws Throwable if any error occurs during the operations
+ */
+ <R> R doKerberosOperations(Executable<R> executable) throws Throwable;
+
+ @FunctionalInterface
+ interface Executable<R> {
+ R execute() throws Throwable;
+ }
+}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
deleted file mode 100644
index 2da071f08f..0000000000
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/HiveBackendProxy.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.iceberg.common.authentication.kerberos;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import net.sf.cglib.proxy.Enhancer;
-import net.sf.cglib.proxy.MethodInterceptor;
-import net.sf.cglib.proxy.MethodProxy;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import
org.apache.gravitino.iceberg.common.utils.CaffeineSchedulerExtractorUtils;
-import org.apache.gravitino.iceberg.common.utils.IcebergHiveCachedClientPool;
-import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.iceberg.ClientPool;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Proxy class for HiveCatalog to support kerberos authentication. We can also
make HiveCatalog as a
- * generic type and pass it as a parameter to the constructor.
- */
-public class HiveBackendProxy implements MethodInterceptor {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveBackendProxy.class);
- private final HiveCatalog target;
- private final String kerberosRealm;
- private final UserGroupInformation proxyUser;
- private final Map<String, String> properties;
- private final ClientPool<IMetaStoreClient, TException> newClientPool;
-
- public HiveBackendProxy(
- Map<String, String> properties, HiveCatalog target, String
kerberosRealm) {
- this.target = target;
- this.properties = properties;
- this.kerberosRealm = kerberosRealm;
- try {
- proxyUser = UserGroupInformation.getCurrentUser();
-
- // Replace the original client pool with IcebergHiveCachedClientPool.
Why do we need to do
- // this? Because the original client pool in Iceberg uses a fixed
username to create the
- // client pool, and it will not work with kerberos authentication. We
need to create a new
- // client pool with the current user. For more, please see
CachedClientPool#clientPool and
- // notice the value of `key`
- this.newClientPool = resetIcebergHiveClientPool();
- } catch (IOException e) {
- throw new RuntimeException("Failed to get current user", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Failed to reset IcebergHiveClientPool", e);
- }
- }
-
- @Override
- public Object intercept(Object o, Method method, Object[] objects,
MethodProxy methodProxy)
- throws Throwable {
- final String finalPrincipalName;
- String proxyKerberosPrincipalName =
PrincipalUtils.getCurrentPrincipal().getName();
- if (!proxyKerberosPrincipalName.contains("@")) {
- finalPrincipalName = String.format("%s@%s", proxyKerberosPrincipalName,
kerberosRealm);
- } else {
- finalPrincipalName = proxyKerberosPrincipalName;
- }
- UserGroupInformation realUser =
- UserGroupInformation.createProxyUser(finalPrincipalName, proxyUser);
-
- String token =
- newClientPool.run(
- client -> client.getDelegationToken(finalPrincipalName,
proxyUser.getShortUserName()));
-
- Token<DelegationTokenIdentifier> delegationToken = new Token<>();
- delegationToken.decodeFromUrlString(token);
- realUser.addToken(delegationToken);
-
- return realUser.doAs(
- (PrivilegedExceptionAction<Object>)
- () -> {
- try {
- return methodProxy.invoke(target, objects);
- } catch (Throwable e) {
- if (RuntimeException.class.isAssignableFrom(e.getClass())) {
- throw (RuntimeException) e;
- }
- throw new RuntimeException("Failed to invoke method", e);
- }
- });
- }
-
- private ClientPool<IMetaStoreClient, TException> resetIcebergHiveClientPool()
- throws IllegalAccessException {
- // Get the old client pool before replacing it
- Object oldPool = FieldUtils.readField(target, "clients", true);
-
- // Create and set the new client pool first
- IcebergHiveCachedClientPool newClientPool =
- new IcebergHiveCachedClientPool(target.getConf(), properties);
- FieldUtils.writeField(target, "clients", newClientPool, true);
-
- // Then try to close the old pool to release resources
- if (oldPool != null) {
- // Try standard close method if available
- if (oldPool instanceof AutoCloseable) {
- try {
- ((AutoCloseable) oldPool).close();
- LOG.info("Successfully closed old Hive client pool");
- } catch (Exception e) {
- LOG.warn("Failed to close old Hive client pool", e);
- }
- }
-
- // Additionally, try to shutdown the internal scheduler thread pool in
Iceberg's
- // CachedClientPool to prevent memory leak. This is necessary because
Iceberg's
- // CachedClientPool does not implement Closeable and does not properly
clean up
- // its internal scheduler.
- try {
- shutdownIcebergCachedClientPoolScheduler(oldPool);
- } catch (Exception e) {
- LOG.warn(
- "Failed to shutdown scheduler in old CachedClientPool, may cause
minor resource leak",
- e);
- }
- }
-
- return newClientPool;
- }
-
- /**
- * Shuts down the scheduler thread pool in Iceberg's CachedClientPool.
- *
- * <p>Required because CachedClientPool doesn't provide cleanup, causing
thread pool leaks.
- *
- * @param clientPool The old CachedClientPool instance
- */
- @VisibleForTesting
- void shutdownIcebergCachedClientPoolScheduler(Object clientPool) {
- try {
- Object cache = FieldUtils.readField(clientPool, "clientPoolCache", true);
- if (cache == null) {
- LOG.debug("clientPoolCache is null, no scheduler to shutdown");
- return;
- }
-
- ScheduledExecutorService executor =
- CaffeineSchedulerExtractorUtils.extractSchedulerExecutor(cache);
- if (executor != null) {
- LOG.info("Shutting down scheduler thread pool from old
CachedClientPool");
- executor.shutdownNow();
- } else {
- LOG.debug("Could not extract scheduler executor from cache");
- }
- } catch (IllegalAccessException e) {
- LOG.debug("Failed to access clientPoolCache field", e);
- }
- }
-
- public HiveCatalog getProxy() {
- Enhancer e = new Enhancer();
- e.setClassLoader(target.getClass().getClassLoader());
- e.setSuperclass(target.getClass());
- e.setCallback(this);
- return (HiveCatalog) e.create();
- }
-}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
index a7d1f9d079..12b612fa82 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/authentication/kerberos/KerberosClient.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -40,6 +41,7 @@ public class KerberosClient implements Closeable {
private final ScheduledThreadPoolExecutor checkTgtExecutor;
private final Map<String, String> conf;
private final Configuration hadoopConf;
+ @Getter private UserGroupInformation loginUser;
private String realm;
public KerberosClient(Map<String, String> conf, Configuration hadoopConf) {
@@ -73,14 +75,14 @@ public class KerberosClient implements Closeable {
// Login
UserGroupInformation.setConfiguration(hadoopConf);
UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
- UserGroupInformation kerberosLoginUgi =
UserGroupInformation.getCurrentUser();
+ loginUser = UserGroupInformation.getCurrentUser();
// Refresh the cache if it's out of date.
int checkInterval = kerberosConfig.getCheckIntervalSec();
checkTgtExecutor.scheduleAtFixedRate(
() -> {
try {
- kerberosLoginUgi.checkTGTAndReloginFromKeytab();
+ loginUser.checkTGTAndReloginFromKeytab();
} catch (Exception e) {
LOG.error("Fail to refresh ugi token: ", e);
}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
index bc9f7c0c09..54513e9a15 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
@@ -73,11 +73,13 @@ public class IcebergCatalogWrapper implements AutoCloseable
{
@Getter protected Catalog catalog;
private SupportsNamespaces asNamespaceCatalog;
private final IcebergCatalogBackend catalogBackend;
+ @Getter private final IcebergConfig icebergConfig;
private String catalogUri = null;
private Map<String, String> catalogPropertiesMap;
private TableMetadataCache metadataCache;
public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
+ this.icebergConfig = icebergConfig;
this.catalogBackend =
IcebergCatalogBackend.valueOf(
icebergConfig.get(IcebergConfig.CATALOG_BACKEND).toUpperCase(Locale.ROOT));
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
new file mode 100644
index 0000000000..c749c1af28
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/KerberosAwareIcebergCatalogProxy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.common.ops;
+
+import java.lang.reflect.Method;
+import net.sf.cglib.proxy.Enhancer;
+import net.sf.cglib.proxy.MethodInterceptor;
+import net.sf.cglib.proxy.MethodProxy;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
+import org.apache.iceberg.catalog.Catalog;
+
+/**
+ * A proxy class for IcebergCatalogWrapper to handle Kerberos authentication
or other cross-cutting
+ * concerns.
+ */
+public class KerberosAwareIcebergCatalogProxy implements MethodInterceptor {
+ private final IcebergCatalogWrapper target;
+ private final Catalog catalog;
+
+ public KerberosAwareIcebergCatalogProxy(IcebergCatalogWrapper target) {
+ this.target = target;
+ this.catalog = target.getCatalog();
+ }
+
+ @Override
+ public Object intercept(Object o, Method method, Object[] objects,
MethodProxy methodProxy)
+ throws Throwable {
+ if (catalog instanceof SupportsKerberos) {
+ SupportsKerberos kerberosCatalog = (SupportsKerberos) catalog;
+ return kerberosCatalog.doKerberosOperations(() ->
methodProxy.invoke(target, objects));
+ }
+
+ return method.invoke(target, objects);
+ }
+
+ public IcebergCatalogWrapper getProxy(IcebergConfig config) {
+ Enhancer e = new Enhancer();
+ e.setClassLoader(target.getClass().getClassLoader());
+ e.setSuperclass(target.getClass());
+ e.setCallback(this);
+
+ Class<?>[] argClass = new Class[] {IcebergConfig.class};
+ return (IcebergCatalogWrapper) e.create(argClass, new Object[] {config});
+ }
+
+ /**
+ * Create a proxy instance with catalogName and config constructor. It's
used for class
+ * CatalogWrapperForREST or its subclass.
+ *
+ * @param catalogName Name of the catalog.
+ * @param config Iceberg configuration.
+ * @return The proxy instance.
+ */
+ public IcebergCatalogWrapper getProxy(String catalogName, IcebergConfig
config) {
+ Enhancer e = new Enhancer();
+ e.setClassLoader(target.getClass().getClassLoader());
+ e.setSuperclass(target.getClass());
+ e.setCallback(this);
+
+ Class<?>[] argClass = new Class[] {String.class, IcebergConfig.class};
+ return (IcebergCatalogWrapper) e.create(argClass, new Object[]
{catalogName, config});
+ }
+}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
index 3ed1ad3ec0..38f2643353 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
@@ -22,8 +22,6 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
@@ -35,9 +33,6 @@ import
org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.iceberg.common.ClosableHiveCatalog;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
-import
org.apache.gravitino.iceberg.common.authentication.kerberos.HiveBackendProxy;
-import
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosClient;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -86,16 +81,7 @@ public class IcebergCatalogUtil {
hdfsConfiguration.set(HADOOP_SECURITY_AUTHORIZATION, "true");
hdfsConfiguration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hiveCatalog.setConf(hdfsConfiguration);
- hiveCatalog.initialize(icebergCatalogName, properties);
-
- KerberosClient kerberosClient = initKerberosAndReturnClient(properties,
hdfsConfiguration);
- hiveCatalog.addResource(kerberosClient);
- if (authenticationConfig.isImpersonationEnabled()) {
- HiveBackendProxy proxyHiveCatalog =
- new HiveBackendProxy(resultProperties, hiveCatalog,
kerberosClient.getRealm());
- return proxyHiveCatalog.getProxy();
- }
-
+ hiveCatalog.initialize(icebergCatalogName, resultProperties);
return hiveCatalog;
} else {
throw new UnsupportedOperationException(
@@ -103,22 +89,6 @@ public class IcebergCatalogUtil {
}
}
- private static KerberosClient initKerberosAndReturnClient(
- Map<String, String> properties, Configuration conf) {
- try {
- KerberosClient kerberosClient = new KerberosClient(properties, conf);
-
- // For Iceberg rest server, we haven't set the catalog_uuid, so we set
it to 0 as there is
- // only one catalog in the rest server, so it's okay to set it to 0.
- String catalogUUID = properties.getOrDefault("catalog_uuid", "0");
- File keytabFile =
kerberosClient.saveKeyTabFileFromUri(Long.valueOf(catalogUUID));
- kerberosClient.login(keytabFile.getAbsolutePath());
- return kerberosClient;
- } catch (IOException e) {
- throw new RuntimeException("Failed to login with kerberos", e);
- }
- }
-
@SuppressWarnings("FormatStringAnnotation")
private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) {
String driverClassName = icebergConfig.getJdbcDriver();
diff --git a/iceberg/iceberg-rest-server/build.gradle.kts
b/iceberg/iceberg-rest-server/build.gradle.kts
index 25db4d9db8..e74945ff3e 100644
--- a/iceberg/iceberg-rest-server/build.gradle.kts
+++ b/iceberg/iceberg-rest-server/build.gradle.kts
@@ -56,6 +56,10 @@ dependencies {
implementation(libs.bundles.metrics)
implementation(libs.bundles.prometheus)
implementation(libs.caffeine)
+ implementation(libs.cglib) {
+ // The version of build-in asm is 7.1, which is not compatible with Java
17 well
+ exclude("org.ow2.asm")
+ }
implementation(libs.commons.dbcp2)
implementation(libs.commons.lang3)
implementation(libs.concurrent.trees)
@@ -108,6 +112,11 @@ dependencies {
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
+ testImplementation(libs.cglib) {
+ // The version of build-in asm is 7.1, which is not compatible with Java
17 well
+ exclude("org.ow2.asm")
+ }
+ testImplementation(libs.asm)
testImplementation(libs.testcontainers.postgresql)
// Add Hadoop 3.3+ dependencies since Spark's Hadoop is excluded
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index d827e601ed..58733d7847 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -29,7 +29,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import org.apache.gravitino.iceberg.common.authentication.SupportsKerberos;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import
org.apache.gravitino.iceberg.common.ops.KerberosAwareIcebergCatalogProxy;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +103,15 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
@VisibleForTesting
protected CatalogWrapperForREST createCatalogWrapper(
String catalogName, IcebergConfig icebergConfig) {
- return new CatalogWrapperForREST(catalogName, icebergConfig);
+ CatalogWrapperForREST rest = new CatalogWrapperForREST(catalogName,
icebergConfig);
+ AuthenticationConfig authenticationConfig =
+ new AuthenticationConfig(icebergConfig.getAllConfig());
+ if (rest.getCatalog() instanceof SupportsKerberos &&
authenticationConfig.isKerberosAuth()) {
+ return (CatalogWrapperForREST)
+ new KerberosAwareIcebergCatalogProxy(rest).getProxy(catalogName,
icebergConfig);
+ }
+
+ return rest;
}
private void closeIcebergCatalogWrapper(IcebergCatalogWrapper
catalogWrapper) {
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
index 1cd1d0f63c..ab32e165d6 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
public abstract class IcebergRESTServiceBaseIT {
public static final Logger LOG =
LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class);
- private SparkSession sparkSession;
+ protected SparkSession sparkSession;
protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY;
private IcebergRESTServerManager icebergRESTServerManager;
@@ -125,14 +125,14 @@ public abstract class IcebergRESTServiceBaseIT {
LOG.info("Iceberg REST service config registered, {}",
StringUtils.join(icebergConfigs));
}
- private int getServerPort() {
+ protected int getServerPort() {
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(
icebergRESTServerManager.getServerConfig(),
IcebergConfig.ICEBERG_CONFIG_PREFIX);
return jettyServerConfig.getHttpPort();
}
- private void initSparkEnv() {
+ protected void initSparkEnv() {
int port = getServerPort();
LOG.info("Iceberg REST server port:{}", port);
String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/",
port);
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
index 90d6907d67..36969f0e31 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
@@ -41,10 +41,10 @@ import org.junit.jupiter.api.condition.EnabledIf;
@EnabledIf("isEmbedded")
public class IcebergRestKerberosHiveCatalogIT extends IcebergRESTHiveCatalogIT
{
- private static final String HIVE_METASTORE_CLIENT_PRINCIPAL =
"cli@HADOOPKRB";
- private static final String HIVE_METASTORE_CLIENT_KEYTAB = "/client.keytab";
+ protected static final String HIVE_METASTORE_CLIENT_PRINCIPAL =
"cli@HADOOPKRB";
+ protected static final String HIVE_METASTORE_CLIENT_KEYTAB =
"/client.keytab";
- private static String tempDir;
+ protected static String tempDir;
public IcebergRestKerberosHiveCatalogIT() {
super();
@@ -150,7 +150,7 @@ public class IcebergRestKerberosHiveCatalogIT extends
IcebergRESTHiveCatalogIT {
return configMap;
}
- private static boolean isEmbedded() {
+ protected static boolean isEmbedded() {
String mode =
System.getProperty(ITUtils.TEST_MODE) == null
? ITUtils.EMBEDDED_TEST_MODE
@@ -159,7 +159,7 @@ public class IcebergRestKerberosHiveCatalogIT extends
IcebergRESTHiveCatalogIT {
return Objects.equals(mode, ITUtils.EMBEDDED_TEST_MODE);
}
- private static void refreshKerberosConfig() {
+ protected static void refreshKerberosConfig() {
Class<?> classRef;
try {
if (System.getProperty("java.vendor").contains("IBM")) {
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
new file mode 100644
index 0000000000..28a7ee4dcc
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveWithUserImpersonationCatalogIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.integration.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+@Tag("gravitino-docker-test")
+@TestInstance(Lifecycle.PER_CLASS)
+@EnabledIf("isEmbedded")
+public class IcebergRestKerberosHiveWithUserImpersonationCatalogIT
+ extends IcebergRestKerberosHiveCatalogIT {
+
+ private static final String NORMAL_USER = "normal";
+
+ public IcebergRestKerberosHiveWithUserImpersonationCatalogIT() {
+ super();
+ }
+
+ @BeforeAll
+ void prepareSQLContext() {
+
+ // Change the ownership of /user/hive to normal user for user
impersonation test. If we do not
+ // change the ownership, the normal user will not have the permission to
create table in Hive
+ // as the /user/hive is owned by user `cli`, please see what's done in
`initEnv` method in
+ // superclass.
+ containerSuite
+ .getKerberosHiveContainer()
+ .executeInContainer("hadoop", "fs", "-chown", "-R", NORMAL_USER,
"/user/hive/");
+
+ super.prepareSQLContext();
+ }
+
+ @Override
+ Map<String, String> getCatalogConfig() {
+ Map<String, String> superConfig = super.getCatalogConfig();
+ Map<String, String> configMap = new HashMap<>(superConfig);
+
+ // Enable user impersonation in Iceberg REST server side, so the user
passed to
+ // HDFS is `normal` instead of `cli`.
+
configMap.put("gravitino.iceberg-rest.authentication.impersonation-enable",
"true");
+ return configMap;
+ }
+
+ @Override
+ protected void initSparkEnv() {
+ int port = getServerPort();
+ LOG.info("Iceberg REST server port:{}", port);
+ String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/",
port);
+ SparkConf sparkConf =
+ new SparkConf()
+ .set(
+ "spark.sql.extensions",
+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .set("spark.sql.catalog.rest",
"org.apache.iceberg.spark.SparkCatalog")
+ .set("spark.sql.catalog.rest.type", "rest")
+ .set("spark.sql.catalog.rest.uri", icebergRESTUri)
+
+ // Add basic auth to connect to Iceberg REST server, so the user
name is `normal` and
+ // the final real user is `normal` via user impersonation.
+ .set("spark.sql.catalog.rest.rest.auth.type", "basic")
+ .set("spark.sql.catalog.rest.rest.auth.basic.username",
NORMAL_USER)
+ .set("spark.sql.catalog.rest.rest.auth.basic.password", "mock")
+ // drop Iceberg table purge may hang in spark local mode
+ .set("spark.locality.wait.node", "0");
+
+ if (supportsCredentialVending()) {
+ sparkConf.set(
+ "spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation",
"vended-credentials");
+ }
+
+ sparkSession =
SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate();
+ }
+
+ protected String getTestNamespace(@Nullable String childNamespace) {
+ String separator;
+ String parentNamespace;
+
+ if (supportsNestedNamespaces()) {
+ parentNamespace = "iceberg_rest.nested.table_test";
+ separator = ".";
+ } else {
+ parentNamespace = "iceberg_rest_with_kerberos_impersonation_table_test";
+ separator = "_";
+ }
+
+ if (childNamespace != null) {
+ return parentNamespace + separator + childNamespace;
+ } else {
+ return parentNamespace;
+ }
+ }
+
+ // Disable the following three tests as they contain data insert operations
and which are not
+ // controlled by the Gravitino Iceberg REST server currently.
+ @Test
+ @Disabled
+ void testDML() {}
+
+ @Test
+ @Disabled
+ void testRegisterTable() {}
+
+ @Test
+ @Disabled
+ void testSnapshot() {}
+}