This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch liquid
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit 47022c38d7148de4b591e03110ae3780b19b3030
Author: Wenzheng Liu <[email protected]>
AuthorDate: Tue Jul 16 16:02:00 2024 +0800

    [CH] Fix kerberos connect hdfs error (#22)
    
    - 240906 Fix conflict due to 
https://github.com/apache/incubator-gluten/pull/7134
    
    (cherry picked from commit 4d8930574bb7bcc5d7e264bfd099fadd3fe1f76e)
---
 .../org/apache/gluten/kerberos/KerberosInit.java   | 101 +++++++++++++++++
 .../kerberos/KerberosTicketRefreshAssist.java      | 120 +++++++++++++++++++++
 .../backendsapi/clickhouse/CHListenerApi.scala     |   3 +
 3 files changed, 224 insertions(+)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java
new file mode 100644
index 0000000000..7a82ef7282
--- /dev/null
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosInit.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.kerberos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.security.krb5.KrbAsReqBuilder;
+import sun.security.krb5.KrbException;
+import sun.security.krb5.PrincipalName;
+import sun.security.krb5.RealmException;
+import sun.security.krb5.internal.HostAddresses;
+import sun.security.krb5.internal.KDCOptions;
+import sun.security.krb5.internal.ccache.Credentials;
+import sun.security.krb5.internal.ccache.CredentialsCache;
+
+import javax.security.auth.kerberos.KeyTab;
+
+import java.io.File;
+import java.io.IOException;
+
+public class KerberosInit {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosInit.class);
+  private final PrincipalName principal;
+  private final KeyTab keytab;
+  private final String cachePath;
+  private final String krb5ConfPath;
+
+  public KerberosInit(String prc, String keytabPath, String cachePath, String 
krb5ConfPath)
+      throws RealmException {
+    this.principal = new PrincipalName(prc);
+    if (keytabPath == null || keytabPath.length() == 0) {
+      LOG.warn("keytab path is null or empty, try use default keytab path");
+      this.keytab = KeyTab.getInstance();
+    } else {
+      File keytabFile = new File(keytabPath);
+      if (!keytabFile.exists()) {
+        throw new IllegalArgumentException("keytab file not exists: " + 
keytabPath);
+      }
+      this.keytab = KeyTab.getInstance(keytabFile);
+    }
+    this.cachePath = cachePath;
+    this.krb5ConfPath = krb5ConfPath;
+    if (krb5ConfPath != null && !new File(krb5ConfPath).exists()) {
+      throw new IllegalArgumentException("krb5 conf file not exists: " + 
krb5ConfPath);
+    }
+  }
+
+  public KerberosInit(String prc, String keytabPath, String cachePath) throws 
RealmException {
+    this(prc, keytabPath, cachePath, null);
+  }
+
+  private void execute0() throws KrbException, IOException {
+    KrbAsReqBuilder builder = new KrbAsReqBuilder(principal, keytab);
+    KDCOptions opt = new KDCOptions();
+    builder.setOptions(opt);
+    String realm = principal.getRealmString();
+    PrincipalName sname = PrincipalName.tgsService(realm, realm);
+    builder.setTarget(sname);
+    builder.setAddresses(HostAddresses.getLocalAddresses());
+
+    builder.action();
+    Credentials credentials = builder.getCCreds();
+    builder.destroy();
+    CredentialsCache cache = CredentialsCache.create(principal, cachePath);
+    if (cache == null) {
+      throw new IOException("Unable to create the cache file " + cachePath);
+    }
+    cache.update(credentials);
+    cache.save();
+    LOG.info("Successfully stored kerberos ticket cache in " + cachePath);
+  }
+
+  public void execute() throws KrbException, IOException {
+    String originalKrb5Conf = System.getProperty("java.security.krb5.conf");
+    try {
+      if (krb5ConfPath != null) {
+        System.setProperty("java.security.krb5.conf", krb5ConfPath);
+      }
+      execute0();
+    } finally {
+      if (krb5ConfPath != null && originalKrb5Conf != null) {
+        System.setProperty("java.security.krb5.conf", originalKrb5Conf);
+      }
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java
new file mode 100644
index 0000000000..cc49605022
--- /dev/null
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/kerberos/KerberosTicketRefreshAssist.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.kerberos;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.util.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class KerberosTicketRefreshAssist {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosTicketRefreshAssist.class);
+
+  private static final String BACKEND_CONFIG_KEY =
+      "spark.gluten.sql.columnar.backend.ch.runtime_config.";
+
+  private static final String USE_KERBEROS_KEY =
+      BACKEND_CONFIG_KEY + "hdfs.hadoop_security_authentication";
+  private static final String KERBEROS_CACHE_KEY =
+      BACKEND_CONFIG_KEY + "hdfs.hadoop_security_kerberos_ticket_cache_path";
+  private static final String USE_KERBEROS_VALUE = "kerberos";
+  private static final String PRINCIPAL_KEY = "spark.kerberos.principal";
+  private static final String KEYTAB_KEY = "spark.kerberos.keytab";
+  private static final String RELOGIN_INTERVAL_KEY = 
"spark.kerberos.relogin.period";
+  private static final String RELOGIN_INTERVAL_DEFAULT = "120m";
+  private static final String CACHE_NAME = "krb5cc_gluten";
+  private static final String KRB5_CONF_NAME = "krb5.conf";
+  private static KerberosTicketRefreshAssist INSTANCE;
+
+  private final ScheduledExecutorService initExecutor;
+
+  private KerberosTicketRefreshAssist() {
+    initExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("CH 
Kerberos Renewal");
+  }
+
+  public static void initKerberosIfNeeded(SparkConf sparkConf) {
+    if (!sparkConf.contains(USE_KERBEROS_KEY)
+        || !sparkConf.get(USE_KERBEROS_KEY).equals(USE_KERBEROS_VALUE)) {
+      LOG.debug("Kerberos is not enabled, skip initialization");
+      return;
+    }
+    if (!sparkConf.contains(PRINCIPAL_KEY) || !sparkConf.contains(KEYTAB_KEY)) 
{
+      LOG.error("Kerberos enabled, but principal or keytab is not set");
+      return;
+    }
+    String workingDir = System.getProperty("user.dir");
+    String principal = sparkConf.get(PRINCIPAL_KEY);
+    String keytab = sparkConf.get(KEYTAB_KEY);
+    String keytabName = new File(keytab).getName();
+    String keytabPath = workingDir + "/" + keytabName;
+    if (!new File(keytabPath).exists()) {
+      LOG.warn("keytab file not exists in working dir, try to use 
spark.kerberos.keytab");
+      keytabPath = keytab;
+    }
+
+    String cachePath = workingDir + "/" + CACHE_NAME;
+    String krb5ConfPath = workingDir + "/" + KRB5_CONF_NAME;
+    if (!new File(krb5ConfPath).exists()) {
+      LOG.warn("krb5 conf file not exists in working dir, try to use system 
krb5 conf");
+      krb5ConfPath = null;
+    }
+
+    KerberosInit kerberosInit;
+    try {
+      kerberosInit = new KerberosInit(principal, keytabPath, cachePath, 
krb5ConfPath);
+    } catch (Exception e) {
+      LOG.error("Failed to init kerberos", e);
+      return;
+    }
+    String period = sparkConf.get(RELOGIN_INTERVAL_KEY, 
RELOGIN_INTERVAL_DEFAULT);
+    long refreshSeconds = JavaUtils.timeStringAs(period, TimeUnit.SECONDS);
+
+    KerberosTicketRefreshAssist assist = 
KerberosTicketRefreshAssist.getInstance();
+    assist.initExecutor.scheduleAtFixedRate(
+        () -> {
+          try {
+            kerberosInit.execute();
+          } catch (Exception e) {
+            LOG.error("Failed to renew kerberos ticket", e);
+          }
+        },
+        0,
+        refreshSeconds,
+        TimeUnit.SECONDS);
+
+    sparkConf.set(KERBEROS_CACHE_KEY, cachePath);
+  }
+
+  public static synchronized KerberosTicketRefreshAssist getInstance() {
+    if (INSTANCE == null) {
+      INSTANCE = new KerberosTicketRefreshAssist();
+    }
+    return INSTANCE;
+  }
+
+  public static void shutdownIfNeeded() {
+    if (INSTANCE != null) {
+      INSTANCE.initExecutor.shutdown();
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5e62126ff8..092ac26187 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.expression.UDFMappings
 import org.apache.gluten.extension.ExpressionExtensionTrait
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.jni.JniLibLoader
+import org.apache.gluten.kerberos.KerberosTicketRefreshAssist
 import org.apache.gluten.vectorized.CHNativeExpressionEvaluator
 
 import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
@@ -60,6 +61,7 @@ class CHListenerApi extends ListenerApi with Logging {
   override def onDriverShutdown(): Unit = shutdown()
 
   override def onExecutorStart(pc: PluginContext): Unit = {
+    KerberosTicketRefreshAssist.initKerberosIfNeeded(pc.conf)
     GlutenExecutorEndpoint.executorEndpoint = new 
GlutenExecutorEndpoint(pc.executorID, pc.conf)
     if (pc.conf().get("spark.master").startsWith("local")) {
       logDebug("Skipping duplicate initializing clickhouse backend on spark 
local mode")
@@ -130,6 +132,7 @@ class CHListenerApi extends ListenerApi with Logging {
   }
 
   private def shutdown(): Unit = {
+    KerberosTicketRefreshAssist.shutdownIfNeeded()
     CHBroadcastBuildSideCache.cleanAll()
     CHNativeExpressionEvaluator.finalizeNative()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to