This is an automated email from the ASF dual-hosted git repository. lwz9103 pushed a commit to branch liquid in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 47022c38d7148de4b591e03110ae3780b19b3030 Author: Wenzheng Liu <[email protected]> AuthorDate: Tue Jul 16 16:02:00 2024 +0800 [CH] Fix kerberos connect hdfs error (#22) - 240906 Fix conflict due to https://github.com/apache/incubator-gluten/pull/7134 (cherry picked from commit 4d8930574bb7bcc5d7e264bfd099fadd3fe1f76e) --- .../org/apache/gluten/kerberos/KerberosInit.java | 101 +++++++++++++++++ .../kerberos/KerberosTicketRefreshAssist.java | 120 +++++++++++++++++++++ .../backendsapi/clickhouse/CHListenerApi.scala | 3 + 3 files changed, 224 insertions(+) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java new file mode 100644 index 0000000000..7a82ef7282 --- /dev/null +++ b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.kerberos; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.security.krb5.KrbAsReqBuilder; +import sun.security.krb5.KrbException; +import sun.security.krb5.PrincipalName; +import sun.security.krb5.RealmException; +import sun.security.krb5.internal.HostAddresses; +import sun.security.krb5.internal.KDCOptions; +import sun.security.krb5.internal.ccache.Credentials; +import sun.security.krb5.internal.ccache.CredentialsCache; + +import javax.security.auth.kerberos.KeyTab; + +import java.io.File; +import java.io.IOException; + +public class KerberosInit { + + private static final Logger LOG = LoggerFactory.getLogger(KerberosInit.class); + private final PrincipalName principal; + private final KeyTab keytab; + private final String cachePath; + private final String krb5ConfPath; + + public KerberosInit(String prc, String keytabPath, String cachePath, String krb5ConfPath) + throws RealmException { + this.principal = new PrincipalName(prc); + if (keytabPath == null || keytabPath.length() == 0) { + LOG.warn("keytab path is null or empty, try use default keytab path"); + this.keytab = KeyTab.getInstance(); + } else { + File keytabFile = new File(keytabPath); + if (!keytabFile.exists()) { + throw new IllegalArgumentException("keytab file not exists: " + keytabPath); + } + this.keytab = KeyTab.getInstance(keytabFile); + } + this.cachePath = cachePath; + this.krb5ConfPath = krb5ConfPath; + if (krb5ConfPath != null && !new File(krb5ConfPath).exists()) { + throw new IllegalArgumentException("krb5 conf file not exists: " + krb5ConfPath); + } + } + + public KerberosInit(String prc, String keytabPath, String cachePath) throws RealmException { + this(prc, keytabPath, cachePath, null); + } + + private void execute0() throws KrbException, IOException { + KrbAsReqBuilder builder = new KrbAsReqBuilder(principal, keytab); + KDCOptions opt = new KDCOptions(); + builder.setOptions(opt); + String realm = principal.getRealmString(); + PrincipalName sname = PrincipalName.tgsService(realm, realm); + builder.setTarget(sname); + builder.setAddresses(HostAddresses.getLocalAddresses()); + + builder.action(); + Credentials credentials = builder.getCCreds(); + builder.destroy(); + CredentialsCache cache = CredentialsCache.create(principal, cachePath); + if (cache == null) { + throw new IOException("Unable to create the cache file " + cachePath); + } + cache.update(credentials); + cache.save(); + LOG.info("Successfully stored kerberos ticket cache in " + cachePath); + } + + public void execute() throws KrbException, IOException { + String originalKrb5Conf = System.getProperty("java.security.krb5.conf"); + try { + if (krb5ConfPath != null) { + System.setProperty("java.security.krb5.conf", krb5ConfPath); + } + execute0(); + } finally { + if (krb5ConfPath != null && originalKrb5Conf != null) { + System.setProperty("java.security.krb5.conf", originalKrb5Conf); + } + } + } +} diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java new file mode 100644 index 0000000000..cc49605022 --- /dev/null +++ b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.kerberos; + +import org.apache.spark.SparkConf; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.util.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class KerberosTicketRefreshAssist { + + private static final Logger LOG = LoggerFactory.getLogger(KerberosTicketRefreshAssist.class); + + private static final String BACKEND_CONFIG_KEY = + "spark.gluten.sql.columnar.backend.ch.runtime_config."; + + private static final String USE_KERBEROS_KEY = + BACKEND_CONFIG_KEY + "hdfs.hadoop_security_authentication"; + private static final String KERBEROS_CACHE_KEY = + BACKEND_CONFIG_KEY + "hdfs.hadoop_security_kerberos_ticket_cache_path"; + private static final String USE_KERBEROS_VALUE = "kerberos"; + private static final String PRINCIPAL_KEY = "spark.kerberos.principal"; + private static final String KEYTAB_KEY = "spark.kerberos.keytab"; + private static final String RELOGIN_INTERVAL_KEY = "spark.kerberos.relogin.period"; + private static final String RELOGIN_INTERVAL_DEFAULT = "120m"; + private static final String CACHE_NAME = "krb5cc_gluten"; + private static final String KRB5_CONF_NAME = "krb5.conf"; + private static KerberosTicketRefreshAssist INSTANCE; + + private final ScheduledExecutorService initExecutor; + + private KerberosTicketRefreshAssist() { + initExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("CH Kerberos Renewal"); + } + + public static void initKerberosIfNeeded(SparkConf sparkConf) { + if (!sparkConf.contains(USE_KERBEROS_KEY) + || !sparkConf.get(USE_KERBEROS_KEY).equals(USE_KERBEROS_VALUE)) { + LOG.debug("Kerberos is not enabled, skip initialization"); + return; + } + if (!sparkConf.contains(PRINCIPAL_KEY) || !sparkConf.contains(KEYTAB_KEY)) { + LOG.error("Kerberos enabled, but principal or keytab is not set"); + return; + } + String workingDir = System.getProperty("user.dir"); + String principal = sparkConf.get(PRINCIPAL_KEY); + String keytab = sparkConf.get(KEYTAB_KEY); + String keytabName = new File(keytab).getName(); + String keytabPath = workingDir + "/" + keytabName; + if (!new File(keytabPath).exists()) { + LOG.warn("keytab file not exists in working dir, try to use spark.kerberos.keytab"); + keytabPath = keytab; + } + + String cachePath = workingDir + "/" + CACHE_NAME; + String krb5ConfPath = workingDir + "/" + KRB5_CONF_NAME; + if (!new File(krb5ConfPath).exists()) { + LOG.warn("krb5 conf file not exists in working dir, try to use system krb5 conf"); + krb5ConfPath = null; + } + + KerberosInit kerberosInit; + try { + kerberosInit = new KerberosInit(principal, keytabPath, cachePath, krb5ConfPath); + } catch (Exception e) { + LOG.error("Failed to init kerberos", e); + return; + } + String period = sparkConf.get(RELOGIN_INTERVAL_KEY, RELOGIN_INTERVAL_DEFAULT); + long refreshSeconds = JavaUtils.timeStringAs(period, TimeUnit.SECONDS); + + KerberosTicketRefreshAssist assist = KerberosTicketRefreshAssist.getInstance(); + assist.initExecutor.scheduleAtFixedRate( + () -> { + try { + kerberosInit.execute(); + } catch (Exception e) { + LOG.error("Failed to renew kerberos ticket", e); + } + }, + 0, + refreshSeconds, + TimeUnit.SECONDS); + + sparkConf.set(KERBEROS_CACHE_KEY, cachePath); + } + + public static synchronized KerberosTicketRefreshAssist getInstance() { + if (INSTANCE == null) { + INSTANCE = new KerberosTicketRefreshAssist(); + } + return INSTANCE; + } + + public static void shutdownIfNeeded() { + if (INSTANCE != null) { + INSTANCE.initExecutor.shutdown(); + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 5e62126ff8..092ac26187 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -25,6 +25,7 @@ import org.apache.gluten.expression.UDFMappings import org.apache.gluten.extension.ExpressionExtensionTrait import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.jni.JniLibLoader +import org.apache.gluten.kerberos.KerberosTicketRefreshAssist import org.apache.gluten.vectorized.CHNativeExpressionEvaluator import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext} @@ -60,6 +61,7 @@ class CHListenerApi extends ListenerApi with Logging { override def onDriverShutdown(): Unit = shutdown() override def onExecutorStart(pc: PluginContext): Unit = { + KerberosTicketRefreshAssist.initKerberosIfNeeded(pc.conf) GlutenExecutorEndpoint.executorEndpoint = new GlutenExecutorEndpoint(pc.executorID, pc.conf) if (pc.conf().get("spark.master").startsWith("local")) { logDebug("Skipping duplicate initializing clickhouse backend on spark local mode") @@ -130,6 +132,7 @@ class CHListenerApi extends ListenerApi with Logging { } private def shutdown(): Unit = { + KerberosTicketRefreshAssist.shutdownIfNeeded() CHBroadcastBuildSideCache.cleanAll() CHNativeExpressionEvaluator.finalizeNative() } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
