This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d7a58088f [#3462] improvement(hadoop-catalog): Support different
level(catalog, schema,fileset) of authentication for hadoop catalog. (#3852)
d7a58088f is described below
commit d7a58088f480dc739bde8cc92d5dd928e557da06
Author: Qi Yu <[email protected]>
AuthorDate: Tue Jul 23 14:09:48 2024 +0800
[#3462] improvement(hadoop-catalog): Support different level(catalog,
schema,fileset) of authentication for hadoop catalog. (#3852)
### What changes were proposed in this pull request?
Support set authentication for schema and fileset level for Hadoop
catalog.
### Why are the changes needed?
The Hadoo catalog may require fine-grained access control and we can
check the authentication status in the catalog, schema, or fileset
level. For instance, when considering Kerberos authentication, we can
authenticate filesets by catalog level, schema level, and file level,
and fileset level authentication configuration has the highest priority.
Fix: #3462
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
IT
---
build.gradle.kts | 2 +-
.../gravitino/catalog/hadoop/HadoopCatalog.java | 14 +-
.../catalog/hadoop/HadoopCatalogOperations.java | 59 +--
.../hadoop/HadoopFilesetPropertiesMetadata.java | 9 +-
.../catalog/hadoop/HadoopProxyPlugin.java | 8 +-
.../hadoop/HadoopSchemaPropertiesMetadata.java | 4 +
.../hadoop/SecureHadoopCatalogOperations.java | 456 +++++++++++++++++++++
.../authentication/AuthenticationConfig.java | 13 +
.../authentication/kerberos/KerberosClient.java | 57 +--
.../hadoop/TestHadoopCatalogOperations.java | 95 +++--
.../test/HadoopUserAuthenticationIT.java | 319 ++++++++++++++
.../hive/integration/test/CatalogHiveIT.java | 2 +-
.../tests/integration/base_hadoop_env.py | 2 +-
.../tests/integration/hdfs_container.py | 2 +-
.../tests/integration/integration_test_env.py | 2 +-
.../org/apache/gravitino/utils/PrincipalUtils.java | 2 +
.../integration/test/container/ContainerSuite.java | 4 +-
integration-test/trino-it/docker-compose.yaml | 4 +-
.../connector/integration/test/SparkCommonIT.java | 4 +-
19 files changed, 930 insertions(+), 128 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index feb62f828..edf9bcbb5 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -163,7 +163,7 @@ allprojects {
// Default use MiniGravitino to run integration tests
param.environment("GRAVITINO_ROOT_DIR", project.rootDir.path)
param.environment("IT_PROJECT_DIR", project.buildDir.path)
- param.environment("HADOOP_USER_NAME", "datastrato")
+ param.environment("HADOOP_USER_NAME", "anonymous")
param.environment("HADOOP_HOME", "/tmp")
param.environment("PROJECT_VERSION", project.version)
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
index 02fad267e..8774705cf 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalog.java
@@ -19,12 +19,9 @@
package org.apache.gravitino.catalog.hadoop;
import java.util.Map;
-import java.util.Optional;
-import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.PropertiesMetadata;
-import org.apache.gravitino.connector.ProxyPlugin;
import org.apache.gravitino.connector.capability.Capability;
/**
@@ -50,7 +47,7 @@ public class HadoopCatalog extends BaseCatalog<HadoopCatalog>
{
@Override
protected CatalogOperations newOps(Map<String, String> config) {
- HadoopCatalogOperations ops = new HadoopCatalogOperations();
+ CatalogOperations ops = new SecureHadoopCatalogOperations();
return ops;
}
@@ -59,15 +56,6 @@ public class HadoopCatalog extends
BaseCatalog<HadoopCatalog> {
return new HadoopCatalogCapability();
}
- @Override
- protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
- boolean impersonationEnabled = new
KerberosConfig(config).isImpersonationEnabled();
- if (!impersonationEnabled) {
- return Optional.empty();
- }
- return Optional.of(new HadoopProxyPlugin());
- }
-
@Override
public PropertiesMetadata catalogPropertiesMetadata() throws
UnsupportedOperationException {
return CATALOG_PROPERTIES_META;
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 6b49c1310..eed8da9a1 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -19,17 +19,14 @@
package org.apache.gravitino.catalog.hadoop;
import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
-import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -43,12 +40,9 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
-import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
-import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.connector.ProxyPlugin;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
@@ -68,8 +62,6 @@ import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,11 +82,6 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
private Map<String, String> conf;
- @SuppressWarnings("unused")
- private ProxyPlugin proxyPlugin;
-
- private String kerberosRealm;
-
private CatalogInfo catalogInfo;
HadoopCatalogOperations(EntityStore store) {
@@ -105,8 +92,20 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
this(GravitinoEnv.getInstance().entityStore());
}
- public String getKerberosRealm() {
- return kerberosRealm;
+ public EntityStore getStore() {
+ return store;
+ }
+
+ public CatalogInfo getCatalogInfo() {
+ return catalogInfo;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public Map<String, String> getConf() {
+ return conf;
}
@Override
@@ -134,32 +133,12 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
.getOrDefault(config,
HadoopCatalogPropertiesMetadata.LOCATION);
conf.forEach(hadoopConf::set);
- initAuthentication(conf, hadoopConf);
this.catalogStorageLocation =
StringUtils.isNotBlank(catalogLocation)
? Optional.of(catalogLocation).map(Path::new)
: Optional.empty();
}
- private void initAuthentication(Map<String, String> conf, Configuration
hadoopConf) {
- AuthenticationConfig config = new AuthenticationConfig(conf);
- String authType = config.getAuthType();
-
- if (StringUtils.equalsIgnoreCase(authType,
AuthenticationMethod.KERBEROS.name())) {
- hadoopConf.set(
- HADOOP_SECURITY_AUTHENTICATION,
- AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
- UserGroupInformation.setConfiguration(hadoopConf);
- try {
- KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
- File keytabFile =
kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
- this.kerberosRealm =
kerberosClient.login(keytabFile.getAbsolutePath());
- } catch (IOException e) {
- throw new RuntimeException("Failed to login with Kerberos", e);
- }
- }
- }
-
@Override
public NameIdentifier[] listFilesets(Namespace namespace) throws
NoSuchSchemaException {
try {
@@ -278,9 +257,9 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
.withNamespace(ident.namespace())
.withComment(comment)
.withFilesetType(type)
- // Store the storageLocation to the store. If the
"storageLocation" is null for
- // managed fileset, Gravitino will get and store the location
based on the
- // catalog/schema's location and store it to the store.
+ // Store the storageLocation to the store. If the
"storageLocation" is null for managed
+ // fileset, Gravitino will get and store the location based on the
catalog/schema's
+ // location and store it to the store.
.withStorageLocation(filesetPath.toString())
.withProperties(properties)
.withAuditInfo(
@@ -664,8 +643,4 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(),
defaultFs.getWorkingDirectory());
}
-
- void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) {
- this.proxyPlugin = hadoopProxyPlugin;
- }
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 76bcc5318..250a48d29 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -18,8 +18,10 @@
*/
package org.apache.gravitino.catalog.hadoop;
-import java.util.Collections;
+import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
@@ -27,6 +29,9 @@ public class HadoopFilesetPropertiesMetadata extends
BasePropertiesMetadata {
@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
- return Collections.emptyMap();
+ ImmutableMap.Builder<String, PropertyEntry<?>> builder =
ImmutableMap.builder();
+ builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
+ builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
+ return builder.build();
}
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
index 9287501c3..4421177bd 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopProxyPlugin.java
@@ -31,9 +31,10 @@ import org.apache.gravitino.connector.ProxyPlugin;
import org.apache.gravitino.utils.Executable;
import org.apache.hadoop.security.UserGroupInformation;
+@Deprecated
public class HadoopProxyPlugin implements ProxyPlugin {
- private HadoopCatalogOperations ops;
- private UserGroupInformation realUser;
+ private SecureHadoopCatalogOperations ops;
+ private final UserGroupInformation realUser;
public HadoopProxyPlugin() {
try {
@@ -82,7 +83,6 @@ public class HadoopProxyPlugin implements ProxyPlugin {
@Override
public void bindCatalogOperation(CatalogOperations ops) {
- this.ops = ((HadoopCatalogOperations) ops);
- this.ops.setProxyPlugin(this);
+ this.ops = ((SecureHadoopCatalogOperations) ops);
}
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
index addd67754..8892433ac 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
@@ -20,6 +20,8 @@ package org.apache.gravitino.catalog.hadoop;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
@@ -45,6 +47,8 @@ public class HadoopSchemaPropertiesMetadata extends
BasePropertiesMetadata {
true /* immutable */,
null,
false /* hidden */))
+ .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
+ .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();
@Override
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
new file mode 100644
index 000000000..30cc53f32
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import javax.security.auth.Subject;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
+import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient;
+import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("removal")
+public class SecureHadoopCatalogOperations
+ implements CatalogOperations, SupportsSchemas, FilesetCatalog {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(SecureHadoopCatalogOperations.class);
+
+ private final HadoopCatalogOperations hadoopCatalogOperations;
+
+ private final List<Closeable> closeables = Lists.newArrayList();
+
+ private final Map<NameIdentifier, UserInfo> userInfoMap =
Maps.newConcurrentMap();
+
+ public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s";
+
+ private String kerberosRealm;
+
+ public SecureHadoopCatalogOperations() {
+ this.hadoopCatalogOperations = new HadoopCatalogOperations();
+ }
+
+ public SecureHadoopCatalogOperations(EntityStore store) {
+ this.hadoopCatalogOperations = new HadoopCatalogOperations(store);
+ }
+
+ @VisibleForTesting
+ public HadoopCatalogOperations getBaseHadoopCatalogOperations() {
+ return hadoopCatalogOperations;
+ }
+
+ public String getKerberosRealm() {
+ return kerberosRealm;
+ }
+
+ static class UserInfo {
+ UserGroupInformation loginUser;
+ boolean enableUserImpersonation;
+ String keytabPath;
+ String realm;
+
+ static UserInfo of(
+ UserGroupInformation loginUser,
+ boolean enableUserImpersonation,
+ String keytabPath,
+ String kerberosRealm) {
+ UserInfo userInfo = new UserInfo();
+ userInfo.loginUser = loginUser;
+ userInfo.enableUserImpersonation = enableUserImpersonation;
+ userInfo.keytabPath = keytabPath;
+ userInfo.realm = kerberosRealm;
+ return userInfo;
+ }
+ }
+
+ // We have overridden the createFileset, dropFileset, createSchema,
dropSchema method to reset
+ // the current user based on the name identifier.
+
+ @Override
+ public Fileset createFileset(
+ NameIdentifier ident,
+ String comment,
+ Fileset.Type type,
+ String storageLocation,
+ Map<String, String> properties)
+ throws NoSuchSchemaException, FilesetAlreadyExistsException {
+ UserGroupInformation currentUser = getUGIByIdent(properties, ident);
+ String apiUser = PrincipalUtils.getCurrentUserName();
+ return doAs(
+ currentUser,
+ () -> {
+ setUser(apiUser);
+ return hadoopCatalogOperations.createFileset(
+ ident, comment, type, storageLocation, properties);
+ },
+ ident);
+ }
+
+ @Override
+ public boolean dropFileset(NameIdentifier ident) {
+ FilesetEntity filesetEntity;
+ try {
+ filesetEntity =
+ hadoopCatalogOperations
+ .getStore()
+ .get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
+ } catch (NoSuchEntityException e) {
+ LOG.warn("Fileset {} does not exist", ident);
+ return false;
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to delete fileset " + ident, ioe);
+ }
+
+ // Reset the current user based on the name identifier.
+ UserGroupInformation currentUser =
getUGIByIdent(filesetEntity.properties(), ident);
+
+ boolean r = doAs(currentUser, () ->
hadoopCatalogOperations.dropFileset(ident), ident);
+ cleanUserInfo(ident);
+ return r;
+ }
+
+ @Override
+ public Schema createSchema(NameIdentifier ident, String comment, Map<String,
String> properties)
+ throws NoSuchCatalogException, SchemaAlreadyExistsException {
+ // Reset the current user based on the name identifier and properties.
+ UserGroupInformation currentUser = getUGIByIdent(properties, ident);
+ String apiUser = PrincipalUtils.getCurrentUserName();
+
+ return doAs(
+ currentUser,
+ () -> {
+ setUser(apiUser);
+ return hadoopCatalogOperations.createSchema(ident, comment,
properties);
+ },
+ ident);
+ }
+
+ @Override
+ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
+ try {
+ SchemaEntity schemaEntity =
+ hadoopCatalogOperations
+ .getStore()
+ .get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
+ Map<String, String> properties =
+
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
+
+ // Reset the current user based on the name identifier.
+ UserGroupInformation user = getUGIByIdent(properties, ident);
+
+ boolean r = doAs(user, () -> hadoopCatalogOperations.dropSchema(ident,
cascade), ident);
+ cleanUserInfo(ident);
+ return r;
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to delete schema " + ident, ioe);
+ }
+ }
+
+ @Override
+ public void initialize(
+ Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
+ throws RuntimeException {
+ hadoopCatalogOperations.initialize(config, info, propertiesMetadata);
+ initAuthentication(hadoopCatalogOperations.getConf(),
hadoopCatalogOperations.getHadoopConf());
+ }
+
+ @Override
+ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
+ throws NoSuchFilesetException, IllegalArgumentException {
+ Fileset fileset = hadoopCatalogOperations.alterFileset(ident, changes);
+
+ String finalName = ident.name();
+ for (FilesetChange change : changes) {
+ if (change instanceof FilesetChange.RenameFileset) {
+ finalName = ((FilesetChange.RenameFileset) change).getNewName();
+ }
+ }
+ if (!ident.name().equals(finalName)) {
+ UserInfo userInfo = userInfoMap.remove(ident);
+ if (userInfo != null) {
+ userInfoMap.put(NameIdentifier.of(ident.namespace(), finalName),
userInfo);
+ }
+ }
+
+ return fileset;
+ }
+
+ @Override
+ public NameIdentifier[] listSchemas(Namespace namespace) throws
NoSuchCatalogException {
+ return hadoopCatalogOperations.listSchemas(namespace);
+ }
+
+ @Override
+ public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+ return hadoopCatalogOperations.loadSchema(ident);
+ }
+
+ @Override
+ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+ throws NoSuchSchemaException {
+ return hadoopCatalogOperations.alterSchema(ident, changes);
+ }
+
+ @Override
+ public NameIdentifier[] listFilesets(Namespace namespace) throws
NoSuchSchemaException {
+ return hadoopCatalogOperations.listFilesets(namespace);
+ }
+
+ @Override
+ public Fileset loadFileset(NameIdentifier ident) throws
NoSuchFilesetException {
+ return hadoopCatalogOperations.loadFileset(ident);
+ }
+
+ @Override
+ public void close() throws IOException {
+ hadoopCatalogOperations.close();
+
+ userInfoMap.clear();
+ closeables.forEach(
+ c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close resource", e);
+ }
+ });
+ }
+
+ @Override
+ public void testConnection(
+ NameIdentifier catalogIdent,
+ Catalog.Type type,
+ String provider,
+ String comment,
+ Map<String, String> properties)
+ throws Exception {
+ hadoopCatalogOperations.testConnection(catalogIdent, type, provider,
comment, properties);
+ }
+
+ private void initAuthentication(Map<String, String> conf, Configuration
hadoopConf) {
+ AuthenticationConfig config = new AuthenticationConfig(conf);
+ CatalogInfo catalogInfo = hadoopCatalogOperations.getCatalogInfo();
+ if (config.isKerberosAuth()) {
+ initKerberos(
+ conf, hadoopConf, NameIdentifier.of(catalogInfo.namespace(),
catalogInfo.name()), true);
+ } else if (config.isSimpleAuth()) {
+ try {
+ // Use service login user.
+ UserGroupInformation u = UserGroupInformation.getCurrentUser();
+ userInfoMap.put(
+ NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()),
+ UserInfo.of(u, config.isImpersonationEnabled(), null, null));
+ } catch (Exception e) {
+ throw new RuntimeException("Can't get service user for Hadoop
catalog", e);
+ }
+ }
+ }
+
+ /**
+ * Get the UserGroupInformation based on the NameIdentifier and properties.
+ *
+ * <p>Note: As UserGroupInformation is a static class, to avoid the thread
safety issue, we need
+ * to use synchronized to ensure the thread safety: Make login and
getLoginUser atomic.
+ */
+ public synchronized String initKerberos(
+ Map<String, String> properties,
+ Configuration configuration,
+ NameIdentifier ident,
+ boolean refreshCredentials) {
+ // Init schema level kerberos authentication.
+
+ CatalogInfo catalogInfo = hadoopCatalogOperations.getCatalogInfo();
+ String keytabPath =
+ String.format(
+ GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" +
ident.toString().replace(".", "-"));
+ KerberosConfig kerberosConfig = new KerberosConfig(properties);
+ if (kerberosConfig.isKerberosAuth()) {
+ configuration.set(
+ HADOOP_SECURITY_AUTHENTICATION,
+ AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
+ try {
+ UserGroupInformation.setConfiguration(configuration);
+ KerberosClient kerberosClient =
+ new KerberosClient(properties, configuration, refreshCredentials);
+ // Add the kerberos client to the closable to close resources.
+ closeables.add(kerberosClient);
+
+ File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath);
+ String kerberosRealm =
kerberosClient.login(keytabFile.getAbsolutePath());
+ // Should this kerberosRealm need to be equals to the realm in the
principal?
+ userInfoMap.put(
+ ident,
+ UserInfo.of(
+ UserGroupInformation.getLoginUser(),
+ kerberosConfig.isImpersonationEnabled(),
+ keytabPath,
+ kerberosRealm));
+ return kerberosRealm;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to login with Kerberos", e);
+ }
+ }
+
+ return null;
+ }
+
+ private UserGroupInformation getUGIByIdent(Map<String, String> properties,
NameIdentifier ident) {
+ KerberosConfig kerberosConfig = new KerberosConfig(properties);
+ if (kerberosConfig.isKerberosAuth()) {
+ // We assume that the realm of catalog is the same as the realm of the
schema and table.
+ initKerberos(properties, new Configuration(), ident, false);
+ }
+ // If the kerberos is not enabled (simple mode), we will use the current
user
+ return getUserBaseOnNameIdentifier(ident);
+ }
+
+ private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier
nameIdentifier) {
+ UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier);
+ if (userInfo == null) {
+ throw new RuntimeException("Failed to get user information for " +
nameIdentifier);
+ }
+
+ UserGroupInformation ugi = userInfo.loginUser;
+ boolean userImpersonation = userInfo.enableUserImpersonation;
+ if (userImpersonation) {
+ String proxyKerberosPrincipalName = PrincipalUtils.getCurrentUserName();
+ if (!proxyKerberosPrincipalName.contains("@")) {
+ proxyKerberosPrincipalName =
+ String.format("%s@%s", proxyKerberosPrincipalName, userInfo.realm);
+ }
+
+ ugi = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName,
ugi);
+ }
+
+ return ugi;
+ }
+
+ private UserInfo getNearestUserGroupInformation(NameIdentifier
nameIdentifier) {
+ NameIdentifier currentNameIdentifier = nameIdentifier;
+ while (currentNameIdentifier != null) {
+ if (userInfoMap.containsKey(currentNameIdentifier)) {
+ return userInfoMap.get(currentNameIdentifier);
+ }
+
+ String[] levels = currentNameIdentifier.namespace().levels();
+ // The ident is catalog level.
+ if (levels.length <= 1) {
+ return null;
+ }
+ currentNameIdentifier =
NameIdentifier.of(currentNameIdentifier.namespace().levels());
+ }
+ return null;
+ }
+
+ private void cleanUserInfo(NameIdentifier identifier) {
+ UserInfo userInfo = userInfoMap.remove(identifier);
+ if (userInfo != null) {
+ removeFile(userInfo.keytabPath);
+ }
+ }
+
+ private void removeFile(String filePath) {
+ if (filePath == null) {
+ return;
+ }
+
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ private <T> T doAs(
+ UserGroupInformation userGroupInformation,
+ PrivilegedExceptionAction<T> action,
+ NameIdentifier ident) {
+ try {
+ return userGroupInformation.doAs(action);
+ } catch (IOException | InterruptedException ioe) {
+ throw new RuntimeException("Failed to operation on fileset " + ident,
ioe);
+ } catch (UndeclaredThrowableException e) {
+ Throwable innerException = e.getCause();
+ if (innerException instanceof PrivilegedActionException) {
+ throw new RuntimeException(innerException.getCause());
+ } else if (innerException instanceof InvocationTargetException) {
+ throw new RuntimeException(innerException.getCause());
+ } else {
+ throw new RuntimeException(innerException);
+ }
+ }
+ }
+
+ /**
+ * Add the user to the subject so that we can get the last user in the
subject. Hadoop catalog
+ * uses this method to pass api user from the client side, so that we can
get the user in the
+ * subject. Please do not mix it with UserGroupInformation.getCurrentUser().
+ *
+ * @param apiUser the username to set.
+ */
+ private void setUser(String apiUser) {
+ java.security.AccessControlContext context =
java.security.AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ subject.getPrincipals().add(new UserPrincipal(apiUser));
+ }
+}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
index 88e70ffdc..384716ce1 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java
@@ -34,6 +34,11 @@ public class AuthenticationConfig extends Config {
public static final String IMPERSONATION_ENABLE_KEY =
"authentication.impersonation-enable";
+ enum AuthenticationType {
+ SIMPLE,
+ KERBEROS
+ }
+
public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
public AuthenticationConfig(Map<String, String> properties) {
@@ -64,6 +69,14 @@ public class AuthenticationConfig extends Config {
return get(ENABLE_IMPERSONATION_ENTRY);
}
+ public boolean isSimpleAuth() {
+ return AuthenticationType.SIMPLE.name().equalsIgnoreCase(getAuthType());
+ }
+
+ public boolean isKerberosAuth() {
+ return AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType());
+ }
+
public static final Map<String, PropertyEntry<?>>
AUTHENTICATION_PROPERTY_ENTRIES =
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
index b0965729b..dd746b4da 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java
@@ -22,6 +22,7 @@ package
org.apache.gravitino.catalog.hadoop.authentication.kerberos;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -36,19 +37,19 @@ import
org.apache.hadoop.security.authentication.util.KerberosName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KerberosClient {
+public class KerberosClient implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(KerberosClient.class);
- public static final String GRAVITINO_KEYTAB_FORMAT =
"keytabs/gravitino-hadoop-%s-keytab";
-
- private final ScheduledThreadPoolExecutor checkTgtExecutor;
+ private ScheduledThreadPoolExecutor checkTgtExecutor;
private final Map<String, String> conf;
private final Configuration hadoopConf;
+ private final boolean refreshCredentials;
- public KerberosClient(Map<String, String> conf, Configuration hadoopConf) {
+ public KerberosClient(
+ Map<String, String> conf, Configuration hadoopConf, boolean
refreshCredentials) {
this.conf = conf;
this.hadoopConf = hadoopConf;
- this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1,
getThreadFactory("check-tgt"));
+ this.refreshCredentials = refreshCredentials;
}
public String login(String keytabFilePath) throws IOException {
@@ -66,28 +67,31 @@ public class KerberosClient {
// Login
UserGroupInformation.setConfiguration(hadoopConf);
KerberosName.resetDefaultRealm();
- UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
- UserGroupInformation kerberosLoginUgi =
UserGroupInformation.getCurrentUser();
+ UserGroupInformation kerberosLoginUgi =
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(catalogPrincipal,
keytabFilePath);
+ UserGroupInformation.setLoginUser(kerberosLoginUgi);
// Refresh the cache if it's out of date.
- int checkInterval = kerberosConfig.getCheckIntervalSec();
- checkTgtExecutor.scheduleAtFixedRate(
- () -> {
- try {
- kerberosLoginUgi.checkTGTAndReloginFromKeytab();
- } catch (Exception e) {
- LOG.error("Fail to refresh ugi token: ", e);
- }
- },
- checkInterval,
- checkInterval,
- TimeUnit.SECONDS);
+ if (refreshCredentials) {
+ this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1,
getThreadFactory("check-tgt"));
+ int checkInterval = kerberosConfig.getCheckIntervalSec();
+ checkTgtExecutor.scheduleAtFixedRate(
+ () -> {
+ try {
+ kerberosLoginUgi.checkTGTAndReloginFromKeytab();
+ } catch (Exception e) {
+ LOG.error("Fail to refresh ugi token: ", e);
+ }
+ },
+ checkInterval,
+ checkInterval,
+ TimeUnit.SECONDS);
+ }
return principalComponents.get(1);
}
- public File saveKeyTabFileFromUri(Long catalogId) throws IOException {
-
+ public File saveKeyTabFileFromUri(String keytabPath) throws IOException {
KerberosConfig kerberosConfig = new KerberosConfig(conf);
String keyTabUri = kerberosConfig.getKeytab();
@@ -103,7 +107,7 @@ public class KerberosClient {
keytabsDir.mkdir();
}
- File keytabFile = new File(String.format(GRAVITINO_KEYTAB_FORMAT,
catalogId));
+ File keytabFile = new File(keytabPath);
keytabFile.deleteOnExit();
if (keytabFile.exists() && !keytabFile.delete()) {
throw new IllegalStateException(
@@ -119,4 +123,11 @@ public class KerberosClient {
private static ThreadFactory getThreadFactory(String factoryName) {
return new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName +
"-%d").build();
}
+
+ @Override
+ public void close() throws IOException {
+ if (checkTgtExecutor != null) {
+ checkTgtExecutor.shutdown();
+ }
+ }
}
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 3c8a4d463..3b9ca0d7f 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -59,6 +59,7 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
@@ -128,6 +129,30 @@ public class TestHadoopCatalogOperations {
private static IdGenerator idGenerator;
+ private static CatalogInfo randomCatalogInfo() {
+ return new CatalogInfo(
+ idGenerator.nextId(),
+ "catalog1",
+ CatalogInfo.Type.FILESET,
+ "provider1",
+ "comment1",
+ Maps.newHashMap(),
+ null,
+ Namespace.of("m1", "c1"));
+ }
+
+ private static CatalogInfo randomCatalogInfo(String metalakeName, String
catalogName) {
+ return new CatalogInfo(
+ idGenerator.nextId(),
+ catalogName,
+ CatalogInfo.Type.FILESET,
+ "hadoop",
+ "comment1",
+ Maps.newHashMap(),
+ null,
+ Namespace.of(metalakeName));
+ }
+
@BeforeAll
public static void setUp() {
Config config = Mockito.mock(Config.class);
@@ -195,14 +220,18 @@ public class TestHadoopCatalogOperations {
@Test
public void testHadoopCatalogConfiguration() {
Map<String, String> emptyProps = Maps.newHashMap();
- HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
- ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+ SecureHadoopCatalogOperations secOps = new
SecureHadoopCatalogOperations(store);
+
+ HadoopCatalogOperations ops = secOps.getBaseHadoopCatalogOperations();
+
+ CatalogInfo catalogInfo = randomCatalogInfo();
+ ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
Configuration conf = ops.hadoopConf;
String value = conf.get("fs.defaultFS");
Assertions.assertEquals("file:///", value);
emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS",
"hdfs://localhost:9000");
- ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+ ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
Configuration conf1 = ops.hadoopConf;
String value1 = conf1.get("fs.defaultFS");
Assertions.assertEquals("hdfs://localhost:9000", value1);
@@ -210,7 +239,7 @@ public class TestHadoopCatalogOperations {
Assertions.assertFalse(ops.catalogStorageLocation.isPresent());
emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION,
"file:///tmp/catalog");
- ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA);
+ ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
Path expectedPath = new Path("file:///tmp/catalog");
Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
@@ -306,8 +335,8 @@ public class TestHadoopCatalogOperations {
Assertions.assertEquals(name, schema.name());
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1",
name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
@@ -330,8 +359,8 @@ public class TestHadoopCatalogOperations {
createSchema(name, comment, null, null);
createSchema(name1, comment1, null, null);
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Set<NameIdentifier> idents =
Arrays.stream(ops.listSchemas(Namespace.of("m1",
"c1"))).collect(Collectors.toSet());
Assertions.assertTrue(idents.size() >= 2);
@@ -348,8 +377,8 @@ public class TestHadoopCatalogOperations {
Schema schema = createSchema(name, comment, catalogPath, null);
Assertions.assertEquals(name, schema.name());
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1",
name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
@@ -395,10 +424,10 @@ public class TestHadoopCatalogOperations {
Assertions.assertEquals(name, schema.name());
NameIdentifier id = NameIdentifierUtil.ofSchema("m1", "c1", name);
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
ops.initialize(
ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION,
catalogPath),
- null,
+ randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
Schema schema1 = ops.loadSchema(id);
Assertions.assertEquals(name, schema1.name());
@@ -452,8 +481,8 @@ public class TestHadoopCatalogOperations {
}
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
@@ -507,8 +536,8 @@ public class TestHadoopCatalogOperations {
+ " when it's catalog and schema "
+ "location are not set",
exception.getMessage());
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () ->
ops.loadFileset(filesetIdent));
@@ -523,8 +552,8 @@ public class TestHadoopCatalogOperations {
Assertions.assertEquals(
"Storage location must be set for external fileset " + filesetIdent,
exception1.getMessage());
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () ->
ops.loadFileset(filesetIdent));
@@ -543,8 +572,8 @@ public class TestHadoopCatalogOperations {
createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null,
null);
}
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
Set<NameIdentifier> idents =
Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName)))
.collect(Collectors.toSet());
@@ -574,8 +603,8 @@ public class TestHadoopCatalogOperations {
}
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
@@ -620,8 +649,8 @@ public class TestHadoopCatalogOperations {
FilesetChange change1 = FilesetChange.setProperty("k1", "v1");
FilesetChange change2 = FilesetChange.removeProperty("k1");
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -685,8 +714,8 @@ public class TestHadoopCatalogOperations {
Fileset fileset = createFileset(name, schemaName, comment,
Fileset.Type.MANAGED, null, null);
FilesetChange change1 = FilesetChange.updateComment("comment26_new");
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -708,8 +737,8 @@ public class TestHadoopCatalogOperations {
Fileset fileset = createFileset(name, schemaName, comment,
Fileset.Type.MANAGED, null, null);
FilesetChange change1 = FilesetChange.removeComment();
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(Maps.newHashMap(), randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
@@ -722,7 +751,7 @@ public class TestHadoopCatalogOperations {
@Test
public void testTestConnection() {
- HadoopCatalogOperations catalogOperations = new
HadoopCatalogOperations(store);
+ SecureHadoopCatalogOperations catalogOperations = new
SecureHadoopCatalogOperations(store);
Assertions.assertDoesNotThrow(
() ->
catalogOperations.testConnection(
@@ -973,8 +1002,8 @@ public class TestHadoopCatalogOperations {
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(props, null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(props, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
name);
Map<String, String> schemaProps = Maps.newHashMap();
@@ -1002,8 +1031,8 @@ public class TestHadoopCatalogOperations {
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}
- try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
- ops.initialize(props, null, HADOOP_PROPERTIES_METADATA);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(props, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
Map<String, String> filesetProps = Maps.newHashMap();
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
index 5b79b5302..6c5fc1ae6 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
@@ -31,6 +31,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.gravitino.Catalog;
@@ -52,6 +54,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.MountableFile;
import sun.security.krb5.KrbException;
@Tag("gravitino-docker-test")
@@ -72,6 +75,15 @@ public class HadoopUserAuthenticationIT extends AbstractIT {
private static final String HADOOP_CLIENT_PRINCIPAL = "cli@HADOOPKRB";
private static final String HADOOP_CLIENT_KEYTAB = "/client.keytab";
+ private static final String HADOOP_SCHEMA_PRINCIPAL = "cli_schema";
+ private static final String HADOOP_FILESET_PRINCIPAL = "cli_fileset";
+
+ private static final String HADOOP_SCHEMA_KEYTAB = "/cli_schema.keytab";
+ private static final String HADOOP_FILESET_KEYTAB = "/cli_fileset.keytab";
+
+ private static final String REALM = "HADOOPKRB";
+ private static final String ADMIN_PASSWORD = "Admin12!";
+
private static String TMP_DIR;
private static String HDFS_URL;
@@ -127,6 +139,8 @@ public class HadoopUserAuthenticationIT extends AbstractIT {
.getContainer()
.copyFileFromContainer("/gravitino_client.keytab", TMP_DIR +
GRAVITINO_CLIENT_KEYTAB);
+ createKeyTableForSchemaAndFileset();
+
// Keytab of the Gravitino server
kerberosHiveContainer
.getContainer()
@@ -153,6 +167,59 @@ public class HadoopUserAuthenticationIT extends AbstractIT
{
System.setProperty("sun.security.krb5.debug", "true");
}
+ private static void createKeyTableForSchemaAndFileset() throws IOException {
+ String shellContent =
+ "echo -e \"%s\\n%s\" | kadmin.local -q \"addprinc %s@%s\""
+ + "\n"
+ + "kadmin.local -q \"xst -k /%s.keytab -norandkey %s@%s\"";
+
+ String createSchemaShellFile = String.format("/%s.sh",
HADOOP_SCHEMA_PRINCIPAL);
+ String createFileSetShellFile = String.format("/%s.sh",
HADOOP_FILESET_PRINCIPAL);
+
+ FileUtils.writeStringToFile(
+ Paths.get(TMP_DIR + createSchemaShellFile).toFile(),
+ String.format(
+ shellContent,
+ ADMIN_PASSWORD,
+ ADMIN_PASSWORD,
+ HADOOP_SCHEMA_PRINCIPAL,
+ REALM,
+ HADOOP_SCHEMA_PRINCIPAL,
+ HADOOP_SCHEMA_PRINCIPAL,
+ REALM),
+ StandardCharsets.UTF_8);
+ kerberosHiveContainer
+ .getContainer()
+ .copyFileToContainer(
+ MountableFile.forHostPath(TMP_DIR + createSchemaShellFile),
createSchemaShellFile);
+ kerberosHiveContainer.executeInContainer("bash", createSchemaShellFile);
+
+ FileUtils.writeStringToFile(
+ Paths.get(TMP_DIR + createFileSetShellFile).toFile(),
+ String.format(
+ shellContent,
+ ADMIN_PASSWORD,
+ ADMIN_PASSWORD,
+ HADOOP_FILESET_PRINCIPAL,
+ REALM,
+ HADOOP_FILESET_PRINCIPAL,
+ HADOOP_FILESET_PRINCIPAL,
+ REALM),
+ StandardCharsets.UTF_8);
+ kerberosHiveContainer
+ .getContainer()
+ .copyFileToContainer(
+ MountableFile.forHostPath(TMP_DIR + createFileSetShellFile),
createFileSetShellFile);
+ kerberosHiveContainer.executeInContainer("bash", createFileSetShellFile);
+
+ kerberosHiveContainer
+ .getContainer()
+ .copyFileFromContainer(HADOOP_SCHEMA_KEYTAB, TMP_DIR +
HADOOP_SCHEMA_KEYTAB);
+ kerberosHiveContainer
+ .getContainer()
+ .copyFileFromContainer(HADOOP_FILESET_KEYTAB, TMP_DIR +
HADOOP_FILESET_KEYTAB);
+ }
+
private static void addKerberosConfig() {
AbstractIT.customConfigs.put("gravitino.authenticator", "kerberos");
AbstractIT.customConfigs.put(
@@ -223,4 +290,256 @@ public class HadoopUserAuthenticationIT extends
AbstractIT {
catalog.asSchemas().dropSchema(SCHEMA_NAME, true);
}
+
+ @Test
+ void testCreateSchemaWithKerberos() {
+ KerberosTokenProvider provider =
+ KerberosTokenProvider.builder()
+ .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL)
+ .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB))
+ .build();
+ adminClient =
GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build();
+
+ String metalakeName = GravitinoITUtils.genRandomName("metalake");
+ String catalogName = GravitinoITUtils.genRandomName("catalog");
+ GravitinoMetalake gravitinoMetalake =
+ adminClient.createMetalake(metalakeName, null, ImmutableMap.of());
+
+ // Create a catalog
+ Map<String, String> properties = Maps.newHashMap();
+ String location = HDFS_URL + "/user/hadoop/" + catalogName;
+
+ properties.put(AUTH_TYPE_KEY, "kerberos");
+ properties.put(IMPERSONATION_ENABLE_KEY, "true");
+ properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB);
+ properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL);
+ properties.put("location", location);
+
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName);
+
+ Catalog catalog =
+ gravitinoMetalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "comment",
properties);
+
+ // Test create schema
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
ImmutableMap.of()));
+ String exceptionMessage = Throwables.getStackTraceAsString(exception);
+ // Make sure real user is 'gravitino_client'
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=gravitino_client,
access=WRITE"));
+
+ Map<String, String> schemaProperty = new HashMap<>();
+ schemaProperty.put(AUTH_TYPE_KEY, "kerberos");
+ // Disable impersonation here, so the user is the same as the principal
+ schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+ schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB);
+ schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM);
+
+ exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
schemaProperty));
+ exceptionMessage = Throwables.getStackTraceAsString(exception);
+ // Make sure real user is 'cli_schema'
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=cli_schema,
access=WRITE"));
+
+ // enable user impersonation, so the real user is gravitino_client
+ schemaProperty.put(IMPERSONATION_ENABLE_KEY, "true");
+ exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
schemaProperty));
+ exceptionMessage = Throwables.getStackTraceAsString(exception);
+ // Make sure real user is 'gravitino_client' if user impersonation enabled.
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=gravitino_client,
access=WRITE"));
+
+ // Now try to give the user the permission to create schema again
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "gravitino_client", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
schemaProperty));
+
+ // Disable impersonation here, so the user is the same as the principal
'cli_schema'
+ schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+ exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
+ catalog.asSchemas().createSchema(SCHEMA_NAME + "_new",
"comment", schemaProperty));
+ exceptionMessage = Throwables.getStackTraceAsString(exception);
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=cli_schema,
access=WRITE"));
+
+ // END of test schema creation
+ Assertions.assertDoesNotThrow(() ->
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
+
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "cli_schema", "/user/hadoop/" + catalogName);
+ Assertions.assertDoesNotThrow(
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
schemaProperty));
+
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, TABLE_NAME),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ ImmutableMap.of());
+ }
+
+ @Test
+ void createFilesetWithKerberos() {
+ KerberosTokenProvider provider =
+ KerberosTokenProvider.builder()
+ .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL)
+ .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB))
+ .build();
+ adminClient =
GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build();
+
+ String metalakeName = GravitinoITUtils.genRandomName("metalake");
+ String catalogName = GravitinoITUtils.genRandomName("catalog");
+ GravitinoMetalake gravitinoMetalake =
+ adminClient.createMetalake(metalakeName, null, ImmutableMap.of());
+
+ // Create a catalog
+ Map<String, String> properties = Maps.newHashMap();
+ String localtion = HDFS_URL + "/user/hadoop/" + catalogName;
+
+ properties.put(AUTH_TYPE_KEY, "kerberos");
+ properties.put(IMPERSONATION_ENABLE_KEY, "true");
+ properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB);
+ properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL);
+ properties.put("location", localtion);
+
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName);
+
+ Catalog catalog =
+ gravitinoMetalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "comment",
properties);
+
+ Map<String, String> schemaProperty = new HashMap<>();
+ schemaProperty.put(AUTH_TYPE_KEY, "kerberos");
+ // Disable impersonation here, so the user is the same as the principal as
'cli_schema'
+ schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+ schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB);
+ schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM);
+
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(
+ () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment",
schemaProperty));
+
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, TABLE_NAME),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ ImmutableMap.of());
+
+ Map<String, String> tableProperty = Maps.newHashMap();
+ tableProperty.put(AUTH_TYPE_KEY, "kerberos");
+ // Disable impersonation here, so the user is the same as the principal as
'cli_schema'
+ tableProperty.put(IMPERSONATION_ENABLE_KEY, "false");
+ tableProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_FILESET_KEYTAB);
+ tableProperty.put(PRINCIPAL_KEY, HADOOP_FILESET_PRINCIPAL + "@" + REALM);
+
+ String fileset1 = GravitinoITUtils.genRandomName("fileset1");
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, fileset1),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ tableProperty));
+ String exceptionMessage = Throwables.getStackTraceAsString(exception);
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=cli_fileset,
access=WRITE"));
+
+ // Now change the owner of schema directory to 'cli_fileset'
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(
+ () ->
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, fileset1),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ tableProperty));
+
+ // enable user impersonation, so the real user is gravitino_client
+ tableProperty.put(IMPERSONATION_ENABLE_KEY, "true");
+ String fileset2 = GravitinoITUtils.genRandomName("fileset2");
+ exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, fileset2),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ tableProperty));
+ exceptionMessage = Throwables.getStackTraceAsString(exception);
+ Assertions.assertTrue(
+ exceptionMessage.contains("Permission denied: user=gravitino_client,
access=WRITE"));
+
+ // Now change the owner of schema directory to 'gravitino_client'
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(
+ () ->
+ catalog
+ .asFilesetCatalog()
+ .createFileset(
+ NameIdentifier.of(SCHEMA_NAME, fileset2),
+ "comment",
+ Fileset.Type.MANAGED,
+ null,
+ tableProperty));
+
+ // As the owner of the current schema directory is 'gravitino_client', the
current user is
+ // cli_fileset.
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME,
fileset1)));
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" +
catalogName);
+ // Now try to drop fileset
+ Assertions.assertDoesNotThrow(
+ () ->
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME,
fileset1)));
+
+ Assertions.assertThrows(
+ Exception.class,
+ () ->
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME,
fileset2)));
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(
+ () ->
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME,
fileset2)));
+
+ Assertions.assertThrows(
+ Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME,
true));
+ kerberosHiveContainer.executeInContainer(
+ "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" +
catalogName);
+ Assertions.assertDoesNotThrow(() ->
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
+ }
}
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index a42043466..a2d2e9995 100644
---
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -344,7 +344,7 @@ public class CatalogHiveIT extends AbstractIT {
}
Assertions.assertTrue(fileStatuses.length > 0);
for (FileStatus fileStatus : fileStatuses) {
- Assertions.assertEquals("datastrato", fileStatus.getOwner());
+ Assertions.assertEquals("anonymous", fileStatus.getOwner());
}
}
diff --git a/clients/client-python/tests/integration/base_hadoop_env.py
b/clients/client-python/tests/integration/base_hadoop_env.py
index 9099ed13f..ea4bc2e6a 100644
--- a/clients/client-python/tests/integration/base_hadoop_env.py
+++ b/clients/client-python/tests/integration/base_hadoop_env.py
@@ -72,7 +72,7 @@ class BaseHadoopEnvironment:
@classmethod
def _configure_hadoop_environment(cls):
logger.info("Configure hadoop environment.")
- os.putenv("HADOOP_USER_NAME", "datastrato")
+ os.putenv("HADOOP_USER_NAME", "anonymous")
os.putenv("HADOOP_HOME",
f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}")
os.putenv(
"HADOOP_CONF_DIR",
diff --git a/clients/client-python/tests/integration/hdfs_container.py
b/clients/client-python/tests/integration/hdfs_container.py
index 1c3f1c21d..b9e1887fc 100644
--- a/clients/client-python/tests/integration/hdfs_container.py
+++ b/clients/client-python/tests/integration/hdfs_container.py
@@ -99,7 +99,7 @@ class HDFSContainer:
image=image_name,
name=self._container_name,
detach=True,
- environment={"HADOOP_USER_NAME": "datastrato"},
+ environment={"HADOOP_USER_NAME": "anonymous"},
network=self._network_name,
)
asyncio.run(check_hdfs_container_status(self._container))
diff --git a/clients/client-python/tests/integration/integration_test_env.py
b/clients/client-python/tests/integration/integration_test_env.py
index 8f1a5a2c8..7b8c05f53 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -149,7 +149,7 @@ class IntegrationTestEnv(unittest.TestCase):
# Restart Gravitino Server
env_vars = os.environ.copy()
- env_vars["HADOOP_USER_NAME"] = "datastrato"
+ env_vars["HADOOP_USER_NAME"] = "anonymous"
result = subprocess.run(
[gravitino_startup_script, "restart"],
env=env_vars,
diff --git a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
index a4b834698..ee8b0a0ee 100644
--- a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
+++ b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
@@ -44,12 +44,14 @@ public class PrincipalUtils {
}
}
+ // This method can't be used in nested `Subject#doAs` block.
public static Principal getCurrentPrincipal() {
java.security.AccessControlContext context =
java.security.AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (subject == null ||
subject.getPrincipals(UserPrincipal.class).isEmpty()) {
return new UserPrincipal(AuthConstants.ANONYMOUS_USER);
}
+
return subject.getPrincipals(UserPrincipal.class).iterator().next();
}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index 4b3b5fdc4..4afaaeffd 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -99,7 +99,7 @@ public class ContainerSuite implements Closeable {
public void startHiveContainer() {
startHiveContainer(
- ImmutableMap.<String, String>builder().put("HADOOP_USER_NAME",
"datastrato").build());
+ ImmutableMap.<String, String>builder().put("HADOOP_USER_NAME",
"anonymous").build());
}
/**
@@ -158,7 +158,7 @@ public class ContainerSuite implements Closeable {
TrinoContainer.builder()
.withEnvVars(
ImmutableMap.<String, String>builder()
- .put("HADOOP_USER_NAME", "datastrato")
+ .put("HADOOP_USER_NAME", "anonymous")
.put("GRAVITINO_HOST_IP", "host.docker.internal")
.put("GRAVITINO_HOST_PORT",
String.valueOf(gravitinoServerPort))
.put("GRAVITINO_METALAKE_NAME", metalakeName)
diff --git a/integration-test/trino-it/docker-compose.yaml
b/integration-test/trino-it/docker-compose.yaml
index a4568b077..04f9264ed 100644
--- a/integration-test/trino-it/docker-compose.yaml
+++ b/integration-test/trino-it/docker-compose.yaml
@@ -26,7 +26,7 @@ services:
- trino-net
container_name: trino-ci-hive
environment:
- - HADOOP_USER_NAME=root
+ - HADOOP_USER_NAME=anonymous
entrypoint: /bin/bash /tmp/hive/init.sh
volumes:
- ./init/hive:/tmp/hive
@@ -86,7 +86,7 @@ services:
- trino-net
container_name: trino-ci-trino
environment:
- - HADOOP_USER_NAME=root
+ - HADOOP_USER_NAME=anonymous
- GRAVITINO_HOST_IP=host.docker.internal
- GRAVITINO_HOST_PORT=${GRAVITINO_SERVER_PORT:-8090}
- GRAVITINO_METALAKE_NAME=test
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index fa1166511..a67fcd218 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -192,7 +192,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertFalse(databaseMeta.containsKey("Comment"));
Assertions.assertTrue(databaseMeta.containsKey("Location"));
- Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
+ Assertions.assertEquals("anonymous", databaseMeta.get("Owner"));
String properties = databaseMeta.get("Properties");
Assertions.assertTrue(properties.contains("(ID,001)"));
@@ -206,7 +206,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
databaseMeta = getDatabaseMetadata(testDatabaseName);
String comment = databaseMeta.get("Comment");
Assertions.assertEquals("comment", comment);
- Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
+ Assertions.assertEquals("anonymous", databaseMeta.get("Owner"));
// underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
properties = databaseMeta.get("Properties");