FANNG1 commented on code in PR #4075: URL: https://github.com/apache/gravitino/pull/4075#discussion_r1666184132
########## catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/authentication/kerberos/KerberosClient.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + public static final String GRAVITINO_KEYTAB_FORMAT = + "keytabs/gravitino-lakehouse-paimon-%s-keytab"; + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map<String, String> conf; + private final Configuration hadoopConf; + + public KerberosClient(Map<String, String> conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( Review Comment: no need to check ########## catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/authentication/kerberos/KerberosClient.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + public static final String GRAVITINO_KEYTAB_FORMAT = + "keytabs/gravitino-lakehouse-paimon-%s-keytab"; + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map<String, String> conf; + private final Configuration hadoopConf; + + public KerberosClient(Map<String, String> conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + KerberosName.resetDefaultRealm(); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation kerberosLoginUgi = UserGroupInformation.getCurrentUser(); + + // Refresh the cache if it's out of date. + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + kerberosLoginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + + return principalComponents.get(1); + } + + public File saveKeyTabFileFromUri(String catalogId) throws IOException { + + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); Review Comment: no need to check? since `KerberosConfig` has already checked it ########## catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/authentication/kerberos/KerberosClient.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + public static final String GRAVITINO_KEYTAB_FORMAT = + "keytabs/gravitino-lakehouse-paimon-%s-keytab"; + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map<String, String> conf; + private final Configuration hadoopConf; + + public KerberosClient(Map<String, String> conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + KerberosName.resetDefaultRealm(); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation kerberosLoginUgi = UserGroupInformation.getCurrentUser(); + + // Refresh the cache if it's out of date. + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + kerberosLoginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + + return principalComponents.get(1); + } + + public File saveKeyTabFileFromUri(String catalogId) throws IOException { + + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + // TODO: Support to download the file from Kerberos HDFS + Preconditions.checkArgument( Review Comment: why not support HDFS? I see HDFS is supported in `fetchFileFromUri` ########## catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/authentication/kerberos/KerberosClient.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + public static final String GRAVITINO_KEYTAB_FORMAT = + "keytabs/gravitino-lakehouse-paimon-%s-keytab"; + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map<String, String> conf; + private final Configuration hadoopConf; + + public KerberosClient(Map<String, String> conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { Review Comment: could you add java doc for this method, especially for the return value ########## catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonKerberosFilesystemIT.java: ########## @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.integration.test; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.AuthenticationConfig.AUTH_TYPE_KEY; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos.KerberosConfig.KEY_TAB_URI_KEY; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.authentication.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import com.datastrato.gravitino.client.GravitinoAdminClient; +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.client.KerberosTokenProvider; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-it") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CatalogPaimonKerberosFilesystemIT extends AbstractIT { + + private static final Logger LOG = + LoggerFactory.getLogger(CatalogPaimonKerberosFilesystemIT.class); + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + private static final String SDK_KERBEROS_PRINCIPAL_KEY = "client.kerberos.principal"; + private static final String SDK_KERBEROS_KEYTAB_KEY = "client.kerberos.keytab"; + + private static final String GRAVITINO_CLIENT_PRINCIPAL = "gravitino_client@HADOOPKRB"; + private static final String GRAVITINO_CLIENT_KEYTAB = "/gravitino_client.keytab"; + + private static final String GRAVITINO_SERVER_PRINCIPAL = "HTTP/localhost@HADOOPKRB"; + private static final String GRAVITINO_SERVER_KEYTAB = "/gravitino_server.keytab"; + + private static final String HDFS_CLIENT_PRINCIPAL = "cli@HADOOPKRB"; + private static final String HDFS_CLIENT_KEYTAB = "/client.keytab"; + + private static String TMP_DIR; + + private static HiveContainer kerberosHiveContainer; + + private static GravitinoAdminClient adminClient; + + private static final String CATALOG_NAME = GravitinoITUtils.genRandomName("test_catalog"); + private static final String SCHEMA_NAME = GravitinoITUtils.genRandomName("test_schema"); + private static final String TABLE_NAME = GravitinoITUtils.genRandomName("test_table"); + + private static String TYPE; + private static String WAREHOUSE; + + private static final String FILESYSTEM_COL_NAME1 = "col1"; + private static final String FILESYSTEM_COL_NAME2 = "col2"; + private static final String FILESYSTEM_COL_NAME3 = "col3"; + + @BeforeAll + public static void startIntegrationTest() { + containerSuite.startKerberosHiveContainer(); + kerberosHiveContainer = containerSuite.getKerberosHiveContainer(); + + TYPE = "filesystem"; + WAREHOUSE = + String.format( + "hdfs://%s:%d/user/hive/paimon_catalog_warehouse/", + kerberosHiveContainer.getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT); + + try { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile(); + file.deleteOnExit(); + TMP_DIR = file.getAbsolutePath(); + + // Prepare kerberos related-config; + prepareKerberosConfig(); + + // Config kerberos configuration for Gravitino server + addKerberosConfig(); + + // Start Gravitino server + AbstractIT.startIntegrationTest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterAll + public static void stop() { + // Reset the UGI + UserGroupInformation.reset(); + + LOG.info("krb5 path: {}", System.getProperty("java.security.krb5.conf")); + // Clean up the kerberos configuration + System.clearProperty("java.security.krb5.conf"); + System.clearProperty("sun.security.krb5.debug"); + + AbstractIT.client = null; + } + + private static void prepareKerberosConfig() throws Exception { + // Keytab of the Gravitino SDK client + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/gravitino_client.keytab", TMP_DIR + GRAVITINO_CLIENT_KEYTAB); + + // Keytab of the Gravitino server + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/gravitino_server.keytab", TMP_DIR + GRAVITINO_SERVER_KEYTAB); + + // Keytab of Gravitino server to connector to Hive + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/etc/admin.keytab", TMP_DIR + HDFS_CLIENT_KEYTAB); + + String tmpKrb5Path = TMP_DIR + "/krb5.conf_tmp"; + String krb5Path = TMP_DIR + "/krb5.conf"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", tmpKrb5Path); + + // Modify the krb5.conf and change the kdc and admin_server to the container IP + String ip = containerSuite.getKerberosHiveContainer().getContainerIpAddress(); + String content = FileUtils.readFileToString(new File(tmpKrb5Path), StandardCharsets.UTF_8); + content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88"); + content = content.replace("admin_server = localhost", "admin_server = " + ip + ":749"); + FileUtils.write(new File(krb5Path), content, StandardCharsets.UTF_8); + + LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path); Review Comment: please remove `\n` ########## catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/CatalogUtils.java: ########## @@ -46,17 +56,55 @@ public static Catalog loadCatalogBackend(PaimonConfig paimonConfig) { String metastore = paimonConfig.get(CATALOG_BACKEND); Preconditions.checkArgument( StringUtils.isNotBlank(metastore), "Paimon Catalog metastore can not be null or empty."); + String warehouse = paimonConfig.get(CATALOG_WAREHOUSE); Preconditions.checkArgument( StringUtils.isNotBlank(warehouse), "Paimon Catalog warehouse can not be null or empty."); + if (!PaimonCatalogBackend.FILESYSTEM.name().equalsIgnoreCase(metastore)) { String uri = paimonConfig.get(CATALOG_URI); Preconditions.checkArgument( - StringUtils.isNotBlank(uri), - String.format("Paimon Catalog uri can not be null or empty for %s.", metastore)); + StringUtils.isNotBlank(uri), "Paimon Catalog uri can not be null or empty."); } + + Map<String, String> allConfig = paimonConfig.getAllConfig(); + Configuration configuration = new Configuration(); + allConfig.forEach(configuration::set); + CatalogContext catalogContext = - CatalogContext.create(Options.fromMap(paimonConfig.getAllConfig())); - return CatalogFactory.createCatalog(catalogContext); + CatalogContext.create(Options.fromMap(paimonConfig.getAllConfig()), configuration); + + AuthenticationConfig authenticationConfig = new AuthenticationConfig(allConfig); + if (authenticationConfig.isSimpleAuth()) { + return CatalogFactory.createCatalog(catalogContext); + } else if (authenticationConfig.isKerberosAuth()) { + configuration.set(HADOOP_SECURITY_AUTHORIZATION, "true"); + configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + + switch (PaimonCatalogBackend.valueOf(metastore.toUpperCase(Locale.ROOT))) { + case FILESYSTEM: + initKerberosAndReturnRealm(allConfig, configuration); Review Comment: could you explain how this effects Paimon catalog, since `CatalogFactory.createCatalog(catalogContext)` is isolated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
