Copilot commented on code in PR #8777: URL: https://github.com/apache/gravitino/pull/8777#discussion_r2568334039
########## clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java: ########## @@ -0,0 +1,178 @@ +package org.apache.gravitino.filesystem.hadoop; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_KERBEROS; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_SIMPlE; Review Comment: Typo in constant name: 'AUTH_SIMPlE' should be 'AUTH_SIMPLE' (lowercase 'l'). ```suggestion import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_SIMPLE; ``` ########## clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java: ########## @@ -762,6 +795,56 @@ protected FileSystem getActualFileSystemByLocationName( } } + /** + * Get user defined configurations for a specific location. Configuration format: + * + * <pre> + * Review Comment: The Javadoc for this method contains an empty `<pre>` block. This should either be removed or populated with a meaningful example showing the configuration format, such as: ``` <pre> fs.path.config.<location_name> = <base_uri> fs.path.config.<location_name>.<property_name> = <property_value> </pre> ``` ```suggestion * <pre> * fs.path.config.<location_name> = <base_uri> * fs.path.config.<location_name>.<property_name> = <property_value> * </pre> * * Example: * <pre> * fs.path.config.cluster1 = s3://bucket/path/ * fs.path.config.cluster1.aws-access-key = XXX * fs.path.config.cluster1.aws-secret-key = XXX ``` ########## clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java: ########## @@ -0,0 +1,178 @@ +package org.apache.gravitino.filesystem.hadoop; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_KERBEROS; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_SIMPlE; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.FS_DISABLE_CACHE; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_KRB5_CONF; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_KEYTAB; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_PRINCIPAL; +import static org.apache.gravitino.catalog.hadoop.fs.Constants.SECURITY_KRB5_ENV; +import static org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider.IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import net.sf.cglib.proxy.Enhancer; +import net.sf.cglib.proxy.MethodInterceptor; +import net.sf.cglib.proxy.MethodProxy; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FileSystem wrapper that runs all operations under a specific UGI (UserGroupInformation). + * Supports both simple and Kerberos authentication, with automatic ticket renewal. + */ +public class HDFSFileSystemProxy implements MethodInterceptor { + + private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSystemProxy.class); + + private static final long DEFAULT_RENEW_INTERVAL_MS = 10 * 60 * 1000L; + private static final String SYSTEM_USER_NAME = System.getProperty("user.name"); + private static final String SYSTEM_ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME"; + + private final UserGroupInformation ugi; + private final FileSystem fs; + private final Configuration configuration; + private ScheduledExecutorService kerberosRenewExecutor; + + /** + * Create a HDFSAuthenticationFileSystem with the given path and configuration. Supports both + * simple and Kerberos authentication, with automatic ticket renewal for Kerberos. + * + * @param path the HDFS path + * @param conf the Hadoop configuration + */ + public HDFSFileSystemProxy(Path path, Configuration conf) { + try { + conf.setBoolean(FS_DISABLE_CACHE, true); + conf.setBoolean(IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED, true); + this.configuration = conf; + + String authType = conf.get(HADOOP_SECURITY_AUTHENTICATION, AUTH_SIMPlE); + if (AUTH_KERBEROS.equalsIgnoreCase(authType)) { + String krb5Config = conf.get(HADOOP_KRB5_CONF); + + if (krb5Config != null) { + System.setProperty(SECURITY_KRB5_ENV, krb5Config); + } + UserGroupInformation.setConfiguration(conf); + String principal = conf.get(HADOOP_SECURITY_PRINCIPAL, null); + String keytab = conf.get(HADOOP_SECURITY_KEYTAB, null); + + if (principal == null || keytab == null) { + throw new GravitinoRuntimeException( + "Kerberos principal and keytab must be provided for kerberos authentication"); + } + + this.ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + startKerberosRenewalTask(principal); + } else { + String userName = System.getenv(SYSTEM_ENV_HADOOP_USER_NAME); + if (StringUtils.isEmpty(userName)) { + userName = SYSTEM_USER_NAME; + } + this.ugi = UserGroupInformation.createRemoteUser(userName); + } + + this.fs = + ugi.doAs( + (PrivilegedExceptionAction<FileSystem>) + () -> FileSystem.newInstance(path.toUri(), conf)); + + } catch (Exception e) { + throw new GravitinoRuntimeException(e, "Failed to create HDFS FileSystem with UGI: %s", path); + } + } + + /** + * Get the proxied FileSystem instance. + * + * @return the proxied FileSystem + * @throws IOException if an I/O error occurs + */ + public FileSystem getProxy() throws IOException { + Enhancer e = new Enhancer(); + e.setClassLoader(fs.getClass().getClassLoader()); + e.setSuperclass(fs.getClass()); + e.setCallback(this); + FileSystem proxyFs = (FileSystem) e.create(); + fs.setConf(configuration); + return proxyFs; + } + + /** Schedule periodic Kerberos re-login to refresh TGT before expiry. */ + private void startKerberosRenewalTask(String principal) { + kerberosRenewExecutor = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "HDFSFileSystemProxy Kerberos-Renewal-Thread"); + t.setDaemon(true); + return t; + }); + + kerberosRenewExecutor.scheduleAtFixedRate( + () -> { + try { + if (ugi.hasKerberosCredentials()) { + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (Exception e) { + LOG.error( + "[Kerberos] Failed to renew TGT for principal {}: {}", + principal, + e.getMessage(), + e); + } + }, + DEFAULT_RENEW_INTERVAL_MS, + DEFAULT_RENEW_INTERVAL_MS, + TimeUnit.MILLISECONDS); + } Review Comment: The KerberosRenewExecutor thread pool is started but never shut down. This could lead to resource leaks. Consider adding a close() or shutdown() method to properly clean up the executor service when the proxy is no longer needed, and ensure it's called when the FileSystem is closed. ########## docs/how-to-use-gvfs.md: ########## @@ -71,11 +71,44 @@ the path mapping and convert automatically. | `fs.gravitino.client.request.header.` | The configuration key prefix for the Gravitino client request header. You can set the request header for the Gravitino client. | (none) | No | 0.9.0-incubating | | `fs.gravitino.enableCredentialVending` | Whether to enable credential vending for the Gravitino Virtual File System. | `false` | No | 0.9.0-incubating | | `fs.gravitino.client.` | The configuration key prefix for the Gravitino client config. | (none) | No | 1.0.0 | -| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | -| `fs.gravitino.autoCreateLocation` | The configuration key for whether to enable auto-creation of fileset location when the server-side filesystem ops are disabled and the location does not exist. | `true` | No | 1.1.0 | +| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset ,fileset schema or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | Review Comment: Missing space after comma in "fileset ,fileset schema". Should be "fileset, fileset schema". ```suggestion | `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset, fileset schema or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | ``` ########## clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java: ########## @@ -0,0 +1,178 @@ +package org.apache.gravitino.filesystem.hadoop; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ Review Comment: The license header is placed incorrectly. According to the Apache Gravitino style, the license header should be at the very top of the file, starting at line 1, not after the package declaration. The package statement should come after the license header. ########## docs/how-to-use-gvfs.md: ########## @@ -71,11 +71,44 @@ the path mapping and convert automatically. | `fs.gravitino.client.request.header.` | The configuration key prefix for the Gravitino client request header. You can set the request header for the Gravitino client. | (none) | No | 0.9.0-incubating | | `fs.gravitino.enableCredentialVending` | Whether to enable credential vending for the Gravitino Virtual File System. | `false` | No | 0.9.0-incubating | | `fs.gravitino.client.` | The configuration key prefix for the Gravitino client config. | (none) | No | 1.0.0 | -| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | -| `fs.gravitino.autoCreateLocation` | The configuration key for whether to enable auto-creation of fileset location when the server-side filesystem ops are disabled and the location does not exist. | `true` | No | 1.1.0 | +| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset ,fileset schema or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | +| `fs.gravitino.autoCreateLocation` | The configuration key for whether to enable auto-creation of fileset location when the server-side filesystem ops are disabled and the location does not exist. | `true` | No | 1.1.0 | +| `fs.path.config.<name>` | Defines a logical location entry. Set `fs.path.config.<name>` to the real base URI (for example, `hdfs://cluster1/`). Any key that starts with the same prefix (such as `fs.path.config.<name>.config.resource`) is treated as a location-scoped property and will be forwarded to the underlying filesystem client. | (none) | No | 1.1.0 | To configure the Gravitino client, use properties prefixed with `fs.gravitino.client.`. These properties will be passed to the Gravitino client after removing the `fs.` prefix. +:::note +When users work with a multi-cluster fileset catalog, they can configure separate sets of properties for the base paths +of the different clusters. [Manage fileset with multiple clusters](./manage-fileset-metadata-using-gravitino.md#manage-fileset-with-multiple-clusters) + +For example, a complex catalog structure might look like this: + +```text +catalog1 -> hdfs://cluster1/catalog1 + schema1 -> hdfs://cluster1/catalog1/schema1 + fileset1 -> hdfs://cluster1/catalog1/schema1/fileset1 + fileset2 -> hdfs://cluster1/catalog1/schema1/fileset2 + schema2 -> hdfs://cluster2/tmp/schema2 + fileset3 -> hdfs://cluster2/tmp/schema2/fsd + fileset4 -> hdfs://cluster3/customers +``` + +In this case, users can configure different client properties for each base path: + +```text +fs.path.config.cluster1 = hdfs://cluster1/ +fs.path.config.cluster1.config.resource= /etc/core-site.xml,/ect/hdfs-site.xml + +fs.path.config.cluster2 = hdfs://cluster2/ +fs.path.config.cluster2.config.resource= /etc/fs2/core-site.xml,/ect/fs2/hdfs-site.xml Review Comment: Typo in path: '/ect/fs2/hdfs-site.xml' should be '/etc/fs2/hdfs-site.xml'. ```suggestion fs.path.config.cluster2.config.resource= /etc/fs2/core-site.xml,/etc/fs2/hdfs-site.xml ``` ########## clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/TestFileset.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +public class TestFileset { + public static void main(String[] args) throws IOException { + f1(); + } + + public static void f1() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.gravitino.server.uri", "http://localhost:8090"); + conf.set("fs.gravitino.client.metalake", "test"); + // set the location name if you want to access a specific location + // conf.set("fs.gravitino.current.location.name","test_location_name"); + Path filesetPath1 = new Path("gvfs://fileset/fileset_catalog/test_schema/fs1"); + Path filesetPath2 = new Path("gvfs://fileset/fileset_catalog/test_schema/fs2"); + FileSystem fs = filesetPath1.getFileSystem(conf); + fs.create(new Path(filesetPath1 + "/file1.txt")).close(); + fs.create(new Path(filesetPath2 + "/file1.txt")).close(); + fs.create(new Path(filesetPath1 + "/file2.txt")).close(); + fs.create(new Path(filesetPath2 + "/file2.txt")).close(); + + System.out.println(filesetPath1 + ":"); + RemoteIterator<LocatedFileStatus> files = fs.listFiles(filesetPath1, false); + while (files.hasNext()) { + LocatedFileStatus f = files.next(); + System.out.println(f.getPath()); + } + + System.out.println(); + System.out.println(filesetPath2 + ":"); + FSDataOutputStream fsDataOutputStream = fs.create(new Path(filesetPath2 + "/file1.txt")); + fsDataOutputStream.close(); + + files = fs.listFiles(filesetPath2, false); + while (files.hasNext()) { + LocatedFileStatus f = files.next(); + System.out.println(f.getPath()); + } + + fs.delete(new Path(filesetPath2 + "/file1.txt"), true); + } + + public static void f2() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.gravitino.server.uri", "http://localhost:8090"); + conf.set("fs.gravitino.client.metalake", "test"); + // set the location name if you want to access a specific location + // conf.set("fs.gravitino.current.location.name","test_location_name"); + Path filesetPath1 = new Path("gvfs://fileset/fileset_catalog/test_schema/fs6"); + Path filesetPath2 = new Path("gvfs://fileset/fileset_catalog/test_schema/fs7"); + FileSystem fs = filesetPath1.getFileSystem(conf); + fs.create(new Path(filesetPath1 + "/file1.txt")).close(); + fs.create(new Path(filesetPath1 + "/file1.txt")).close(); + // fs.delete(new Path(filesetPath1 + "/file1.txt"), true); + fs.create(new Path(filesetPath2 + "/file2.txt")).close(); + fs.create(new Path(filesetPath2 + "/file2.txt")).close(); + } +} Review Comment: This appears to be a manual test file rather than an automated test. According to Apache Gravitino testing standards, all code changes should include proper automated tests. This file should either be: 1. Converted to a proper JUnit test class with @Test annotations, or 2. Removed if it's only intended as temporary/exploratory code. Manual test code should not be committed to the repository. ########## clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GvfsMultipleClusterIT.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN; +import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.HiveContainer; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-test") +@DisabledIf("org.apache.gravitino.integration.test.util.ITUtils#isEmbedded") +public class GvfsMultipleClusterIT extends BaseIT { + private static final Logger LOG = LoggerFactory.getLogger(GvfsMultipleClusterIT.class); + protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected HiveContainer hiveContainer; + protected HiveContainer kerberosHiveContainer; + + protected String metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + protected String catalogName = GravitinoITUtils.genRandomName("catalog"); + protected String schemaName = GravitinoITUtils.genRandomName("schema"); + protected GravitinoMetalake metalake; + protected Configuration conf = new Configuration(); + protected Map<String, String> properties = Maps.newHashMap(); + protected String configResourcesPath; + + @BeforeAll + public void startUp() throws Exception { + containerSuite.startHiveContainer(); + hiveContainer = containerSuite.getHiveContainer(); + + containerSuite.startKerberosHiveContainer(); + kerberosHiveContainer = containerSuite.getKerberosHiveContainer(); + + setupKerberosEnv(); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + } + + private void setupKerberosEnv() throws Exception { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile(); + file.deleteOnExit(); + String tmpDir = file.getAbsolutePath(); + this.configResourcesPath = tmpDir; + + // Keytab of the Gravitino SDK client + String keytabPath = tmpDir + "/admin.keytab"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/admin.keytab", keytabPath); + + String tmpKrb5Path = tmpDir + "/krb5.conf_tmp"; + String krb5Path = tmpDir + "/krb5.conf"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", tmpKrb5Path); + + // Modify the krb5.conf and change the kdc and admin_server to the container IP + String ip = containerSuite.getKerberosHiveContainer().getContainerIpAddress(); + String content = FileUtils.readFileToString(new File(tmpKrb5Path), StandardCharsets.UTF_8); + content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88"); + content = content.replace("admin_server = localhost", "admin_server = " + ip + ":749"); + FileUtils.write(new File(krb5Path), content, StandardCharsets.UTF_8); + + LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path); + System.setProperty("java.security.krb5.conf", krb5Path); + System.setProperty("sun.security.krb5.debug", "true"); + + // create hdfs-site.xml and core-site.xml + // read hdfs-site.xml from resources + String hdfsSiteXml = readResourceFile("hd_kbs_conf/hdfs-site.xml"); + FileUtils.write(new File(tmpDir + "/hdfs-site.xml"), hdfsSiteXml, StandardCharsets.UTF_8); + String coreSiteXml = readResourceFile("hd_kbs_conf/core-site.xml"); + coreSiteXml = coreSiteXml.replace("XXX_KEYTAB_XXX", keytabPath); + coreSiteXml = coreSiteXml.replace("XXX_KRB_CONF_XXX", krb5Path); + FileUtils.write(new File(tmpDir + "/core-site.xml"), coreSiteXml, StandardCharsets.UTF_8); + + LOG.info("Kerberos config resources created in {}", tmpDir); + refreshKerberosConfig(); + KerberosName.resetDefaultRealm(); + + LOG.info("Kerberos default realm: {}", KerberosUtil.getDefaultRealm()); + } + + private static String readResourceFile(String resourcePath) throws IOException { + return new String( + GvfsMultipleClusterIT.class + .getClassLoader() + .getResourceAsStream(resourcePath) + .readAllBytes(), + StandardCharsets.UTF_8); + } + + private static void refreshKerberosConfig() { + Class<?> classRef; + try { + if (System.getProperty("java.vendor").contains("IBM")) { + classRef = Class.forName("com.ibm.security.krb5.internal.Config"); + } else { + classRef = Class.forName("sun.security.krb5.Config"); + } + + Method refershMethod = classRef.getMethod("refresh"); + refershMethod.invoke(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterAll + public void tearDown() throws IOException { + if (metalake == null) { + return; + } + + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + protected Path genGvfsPath(String fileset) { + return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, schemaName, fileset)); + } + + private String baseHdfsPath(String ip, String filesetName) { + return String.format( + "hdfs://%s:%d/tmp/%s/%s/%s", + ip, HiveContainer.HDFS_DEFAULTFS_PORT, catalogName, schemaName, filesetName); + } + + @Test + public void testFsOperation() throws IOException { + // create a fileset with normal cluster + String normalFilesetName = GravitinoITUtils.genRandomName("fileset_normal"); + NameIdentifier normalFilesetIdent = NameIdentifier.of(schemaName, normalFilesetName); + Catalog catalog = metalake.loadCatalog(catalogName); + String location = baseHdfsPath(hiveContainer.getContainerIpAddress(), normalFilesetName); + catalog + .asFilesetCatalog() + .createMultipleLocationFileset( + normalFilesetIdent, + "fileset comment", + Fileset.Type.MANAGED, + ImmutableMap.of(LOCATION_NAME_UNKNOWN, location), + ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, LOCATION_NAME_UNKNOWN)); + Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(normalFilesetIdent)); + + // create a fileset with kerberos cluster + String kerberosFilesetName = GravitinoITUtils.genRandomName("fileset_kerberos"); + NameIdentifier kerberosFilesetIdent = NameIdentifier.of(schemaName, kerberosFilesetName); + location = baseHdfsPath(kerberosHiveContainer.getContainerIpAddress(), kerberosFilesetName); + String configResources = + configResourcesPath + "/core-site.xml," + configResourcesPath + "/hdfs-site.xml"; + catalog + .asFilesetCatalog() + .createMultipleLocationFileset( + kerberosFilesetIdent, + "fileset comment", + Fileset.Type.MANAGED, + ImmutableMap.of(LOCATION_NAME_UNKNOWN, location), + ImmutableMap.of( + PROPERTY_DEFAULT_LOCATION_NAME, + LOCATION_NAME_UNKNOWN, + "gravitino.bypass.dfs.namenode.kerberos.principal.pattern", + "*", + "config.resources", + configResources)); + Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(kerberosFilesetIdent)); + + Path normalGvfsPath = genGvfsPath(normalFilesetName); + Path kerberosGvfsPath = genGvfsPath(kerberosFilesetName); + try (FileSystem gvfs = normalGvfsPath.getFileSystem(conf)) { + if (!gvfs.exists(normalGvfsPath)) { + gvfs.mkdirs(normalGvfsPath); + } + if (!gvfs.exists(kerberosGvfsPath)) { + gvfs.mkdirs(kerberosGvfsPath); + } + + gvfs.create(new Path(normalGvfsPath + "/file1.txt")).close(); + gvfs.create(new Path(kerberosGvfsPath + "/file1.txt")).close(); + gvfs.create(new Path(normalGvfsPath + "/file2.txt")).close(); + gvfs.create(new Path(kerberosGvfsPath + "/file2.txt")).close(); + } + + // catalog.asFilesetCatalog().dropFileset(normalFilesetIdent); + // catalog.asFilesetCatalog().dropFileset(kerberosFilesetIdent); Review Comment: Commented-out code should be removed. The test should properly clean up by dropping the created filesets in the tearDown method, or these lines should be uncommented if they're needed for cleanup. ```suggestion catalog.asFilesetCatalog().dropFileset(normalFilesetIdent); catalog.asFilesetCatalog().dropFileset(kerberosFilesetIdent); ``` ########## catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java: ########## @@ -27,4 +27,23 @@ public class Constants { // Name of the built-in HDFS file system provider public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs"; + + // Name of the configuration property for HDFS config resources + public static final String CONFIG_RESOURCES = "config.resources"; + // Name of the configuration property to disable HDFS FileSystem cache + public static final String FS_DISABLE_CACHE = "fs.hdfs.impl.disable.cache"; + // Name of the configuration property for Kerberos principal + public static final String HADOOP_SECURITY_PRINCIPAL = + "hadoop.security.authentication.kerberos.principal"; + // Name of the configuration property for Kerberos keytab + public static final String HADOOP_SECURITY_KEYTAB = + "hadoop.security.authentication.kerberos.keytab"; + // Name of the configuration property for Kerberos krb5.conf location + public static final String HADOOP_KRB5_CONF = "hadoop.security.authentication.kerberos.krb5.conf"; + // Environment variable for Java Kerberos configuration + public static final String SECURITY_KRB5_ENV = "java.security.krb5.conf"; + // Supported authentication types + public static final String AUTH_KERBEROS = "kerberos"; + // Simple authentication type + public static final String AUTH_SIMPlE = "simple"; Review Comment: Typo in constant name: 'AUTH_SIMPlE' should be 'AUTH_SIMPLE' (lowercase 'l'). ```suggestion public static final String AUTH_SIMPLE = "simple"; ``` ########## clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GvfsMultipleClusterIT.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN; +import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.HiveContainer; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-test") +@DisabledIf("org.apache.gravitino.integration.test.util.ITUtils#isEmbedded") +public class GvfsMultipleClusterIT extends BaseIT { + private static final Logger LOG = LoggerFactory.getLogger(GvfsMultipleClusterIT.class); + protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected HiveContainer hiveContainer; + protected HiveContainer kerberosHiveContainer; + + protected String metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + protected String catalogName = GravitinoITUtils.genRandomName("catalog"); + protected String schemaName = GravitinoITUtils.genRandomName("schema"); + protected GravitinoMetalake metalake; + protected Configuration conf = new Configuration(); + protected Map<String, String> properties = Maps.newHashMap(); + protected String configResourcesPath; + + @BeforeAll + public void startUp() throws Exception { + containerSuite.startHiveContainer(); + hiveContainer = containerSuite.getHiveContainer(); + + containerSuite.startKerberosHiveContainer(); + kerberosHiveContainer = containerSuite.getKerberosHiveContainer(); + + setupKerberosEnv(); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + } + + private void setupKerberosEnv() throws Exception { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile(); + file.deleteOnExit(); + String tmpDir = file.getAbsolutePath(); + this.configResourcesPath = tmpDir; + + // Keytab of the Gravitino SDK client + String keytabPath = tmpDir + "/admin.keytab"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/admin.keytab", keytabPath); + + String tmpKrb5Path = tmpDir + "/krb5.conf_tmp"; + String krb5Path = tmpDir + "/krb5.conf"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", tmpKrb5Path); + + // Modify the krb5.conf and change the kdc and admin_server to the container IP + String ip = containerSuite.getKerberosHiveContainer().getContainerIpAddress(); + String content = FileUtils.readFileToString(new File(tmpKrb5Path), StandardCharsets.UTF_8); + content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88"); + content = content.replace("admin_server = localhost", "admin_server = " + ip + ":749"); + FileUtils.write(new File(krb5Path), content, StandardCharsets.UTF_8); + + LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path); + System.setProperty("java.security.krb5.conf", krb5Path); + System.setProperty("sun.security.krb5.debug", "true"); + + // create hdfs-site.xml and core-site.xml + // read hdfs-site.xml from resources + String hdfsSiteXml = readResourceFile("hd_kbs_conf/hdfs-site.xml"); + FileUtils.write(new File(tmpDir + "/hdfs-site.xml"), hdfsSiteXml, StandardCharsets.UTF_8); + String coreSiteXml = readResourceFile("hd_kbs_conf/core-site.xml"); + coreSiteXml = coreSiteXml.replace("XXX_KEYTAB_XXX", keytabPath); + coreSiteXml = coreSiteXml.replace("XXX_KRB_CONF_XXX", krb5Path); + FileUtils.write(new File(tmpDir + "/core-site.xml"), coreSiteXml, StandardCharsets.UTF_8); + + LOG.info("Kerberos config resources created in {}", tmpDir); + refreshKerberosConfig(); + KerberosName.resetDefaultRealm(); + + LOG.info("Kerberos default realm: {}", KerberosUtil.getDefaultRealm()); + } + + private static String readResourceFile(String resourcePath) throws IOException { + return new String( + GvfsMultipleClusterIT.class + .getClassLoader() + .getResourceAsStream(resourcePath) + .readAllBytes(), + StandardCharsets.UTF_8); + } + + private static void refreshKerberosConfig() { + Class<?> classRef; + try { + if (System.getProperty("java.vendor").contains("IBM")) { + classRef = Class.forName("com.ibm.security.krb5.internal.Config"); + } else { + classRef = Class.forName("sun.security.krb5.Config"); + } + + Method refershMethod = classRef.getMethod("refresh"); + refershMethod.invoke(null); Review Comment: Typo in variable name: 'refershMethod' should be 'refreshMethod'. ```suggestion Method refreshMethod = classRef.getMethod("refresh"); refreshMethod.invoke(null); ``` ########## docs/how-to-use-gvfs.md: ########## @@ -71,11 +71,44 @@ the path mapping and convert automatically. | `fs.gravitino.client.request.header.` | The configuration key prefix for the Gravitino client request header. You can set the request header for the Gravitino client. | (none) | No | 0.9.0-incubating | | `fs.gravitino.enableCredentialVending` | Whether to enable credential vending for the Gravitino Virtual File System. | `false` | No | 0.9.0-incubating | | `fs.gravitino.client.` | The configuration key prefix for the Gravitino client config. | (none) | No | 1.0.0 | -| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | -| `fs.gravitino.autoCreateLocation` | The configuration key for whether to enable auto-creation of fileset location when the server-side filesystem ops are disabled and the location does not exist. | `true` | No | 1.1.0 | +| `fs.gravitino.filesetMetadataCache.enable` | Whether to cache the fileset ,fileset schema or fileset catalog metadata in the Gravitino Virtual File System. Note that this cache causes a side effect: if you modify the fileset or fileset catalog metadata, the client can not see the latest changes. | `false` | No | 1.0.0 | +| `fs.gravitino.autoCreateLocation` | The configuration key for whether to enable auto-creation of fileset location when the server-side filesystem ops are disabled and the location does not exist. | `true` | No | 1.1.0 | +| `fs.path.config.<name>` | Defines a logical location entry. Set `fs.path.config.<name>` to the real base URI (for example, `hdfs://cluster1/`). Any key that starts with the same prefix (such as `fs.path.config.<name>.config.resource`) is treated as a location-scoped property and will be forwarded to the underlying filesystem client. | (none) | No | 1.1.0 | To configure the Gravitino client, use properties prefixed with `fs.gravitino.client.`. These properties will be passed to the Gravitino client after removing the `fs.` prefix. +:::note +When users work with a multi-cluster fileset catalog, they can configure separate sets of properties for the base paths +of the different clusters. [Manage fileset with multiple clusters](./manage-fileset-metadata-using-gravitino.md#manage-fileset-with-multiple-clusters) + +For example, a complex catalog structure might look like this: + +```text +catalog1 -> hdfs://cluster1/catalog1 + schema1 -> hdfs://cluster1/catalog1/schema1 + fileset1 -> hdfs://cluster1/catalog1/schema1/fileset1 + fileset2 -> hdfs://cluster1/catalog1/schema1/fileset2 + schema2 -> hdfs://cluster2/tmp/schema2 + fileset3 -> hdfs://cluster2/tmp/schema2/fsd + fileset4 -> hdfs://cluster3/customers +``` + +In this case, users can configure different client properties for each base path: + +```text +fs.path.config.cluster1 = hdfs://cluster1/ +fs.path.config.cluster1.config.resource= /etc/core-site.xml,/ect/hdfs-site.xml Review Comment: Typo in path: '/ect/hdfs-site.xml' should be '/etc/hdfs-site.xml'. ```suggestion fs.path.config.cluster1.config.resource= /etc/core-site.xml,/etc/hdfs-site.xml ``` ########## catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java: ########## @@ -157,21 +153,42 @@ public boolean dropFileset(NameIdentifier ident) { throw new RuntimeException("Failed to delete fileset " + ident, ioe); } + Map<String, String> filesetProperties = + mergeUpLevelConfigurations(ident, filesetEntity.properties()); UserContext userContext = UserContext.getUserContext( - ident, filesetEntity.properties(), null, filesetCatalogOperations.getCatalogInfo()); + ident, filesetProperties, filesetCatalogOperations.getCatalogInfo()); boolean r = userContext.doAs(() -> filesetCatalogOperations.dropFileset(ident), ident); UserContext.clearUserContext(ident); return r; } + public Map<String, String> mergeUpLevelConfigurations( + NameIdentifier ident, Map<String, String> entityProperties) { + Map<String, String> mergedProperties = new HashMap<>(filesetCatalogOperations.getConf()); + if (ident.namespace().levels().length == 2) { + // schema level + mergedProperties.putAll(entityProperties); + return mergedProperties; + } + + // fileset level + NameIdentifierUtil.checkFileset(ident); + NameIdentifier schemaIdent = NameIdentifierUtil.getSchemaIdentifier(ident); + Schema schema = filesetCatalogOperations.loadSchema(schemaIdent); + mergedProperties.putAll(schema.properties()); + mergedProperties.putAll(entityProperties); + return mergedProperties; + } Review Comment: The `mergeUpLevelConfigurations` method loads the schema on every fileset operation (line 178). For operations like `dropFileset` that are called frequently, this could lead to performance issues as it makes an additional database/storage call. Consider caching the schema properties or passing them as a parameter if they're already available in the calling context. ########## integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/HiveContainer.java: ########## @@ -154,8 +154,8 @@ protected boolean checkContainerStatus(int retryLimit) { final String showDatabaseSQL = "show databases"; await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .atMost(120, TimeUnit.SECONDS) + .pollInterval(120 / retryLimit, TimeUnit.SECONDS) Review Comment: The timeout increase from 30 to 120 seconds (4x increase) is quite significant. Consider adding a comment explaining why this change was necessary, especially since this affects all tests using HiveContainer. This could indicate an underlying performance issue that should be investigated or documented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
