This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 455cd4013 [CELEBORN-1111] Supporting connection to HDFS with Kerberos 
authentication enabled
455cd4013 is described below

commit 455cd401376373e406ee0dc8c046ca966421af73
Author: joey.ljy <[email protected]>
AuthorDate: Sat Nov 4 17:21:41 2023 +0800

    [CELEBORN-1111] Supporting connection to HDFS with Kerberos authentication 
enabled
    
    ### What changes were proposed in this pull request?
    Adding Kerberos support for HDFS storage type.
    
    The following five parameters need to be configured:
    | key | value |
    | :--: | :--: |
    | celeborn.storage.hdfs.kerberos.enabled | true |
    | celeborn.storage.hdfs.kerberos.principal | userREALM |
    | celeborn.storage.hdfs.kerberos.keytab | /path/test.keytab |
    | celeborn.hadoop.hadoop.security.authorization | kerberos |
    | celeborn.hadoop.dfs.namenode.kerberos.principal | hdfs/_HOSTREALM |
    
    ### Why are the changes needed?
    Connecting to HDFS with Kerberos enabled requires support for keytab login.
    
    ### Does this PR introduce _any_ user-facing change?
    Add 3 configurations.
    celeborn.storage.hdfs.kerberos.enabled
    celeborn.storage.hdfs.kerberos.principal
    celeborn.storage.hdfs.kerberos.keytab
    
    ### How was this patch tested?
    Test in Kerberos enabled HDFS cluster.
    
    Closes #2072 from liujiayi771/hdfs-kerberos.
    
    Authored-by: joey.ljy <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 31 ++++++++++++++++++++++
 .../celeborn/common/util/CelebornHadoopUtils.scala | 28 +++++++++++++++++--
 docs/configuration/master.md                       |  3 +++
 docs/configuration/worker.md                       |  3 +++
 4 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e024f6f1e..6923fea3f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1065,6 +1065,13 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
   def clientFlinkDataCompressionEnabled: Boolean = 
get(CLIENT_DATA_COMPRESSION_ENABLED)
   def clientShuffleMapPartitionSplitEnabled = 
get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED)
+
+  // //////////////////////////////////////////////////////
+  //                    kerberos                         //
+  // //////////////////////////////////////////////////////
+  def hdfsStorageKerberosEnabled = get(HDFS_STORAGE_TYPE_KERBEROS_ENABLED)
+  def hdfsStorageKerberosPrincipal = get(HDFS_STORAGE_KERBEROS_PRINCIPAL)
+  def hdfsStorageKerberosKeytab = get(HDFS_STORAGE_KERBEROS_KEYTAB)
 }
 
 object CelebornConf extends Logging {
@@ -4009,4 +4016,28 @@ object CelebornConf extends Logging {
       .version("0.3.2")
       .intConf
       .createWithDefault(64)
+
+  val HDFS_STORAGE_TYPE_KERBEROS_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.storage.hdfs.kerberos.enabled")
+      .categories("master", "worker")
+      .version("0.3.2")
+      .doc("Whether to enable kerberos authentication for HDFS storage 
connection.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val HDFS_STORAGE_KERBEROS_PRINCIPAL: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.hdfs.kerberos.principal")
+      .categories("master", "worker")
+      .version("0.3.2")
+      .doc("Kerberos principal for HDFS storage connection.")
+      .stringConf
+      .createOptional
+
+  val HDFS_STORAGE_KERBEROS_KEYTAB: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.hdfs.kerberos.keytab")
+      .categories("master", "worker")
+      .version("0.3.2")
+      .doc("Kerberos keytab file path for HDFS storage connection.")
+      .stringConf
+      .createOptional
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 166c2a234..1135d5bcb 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -17,13 +17,15 @@
 
 package org.apache.celeborn.common.util
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.internal.Logging
 
 object CelebornHadoopUtils extends Logging {
@@ -56,7 +58,9 @@ object CelebornHadoopUtils extends Logging {
   }
 
   def getHadoopFS(conf: CelebornConf): FileSystem = {
-    new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf))
+    val hadoopConf = newConfiguration(conf)
+    initKerberos(conf, hadoopConf)
+    new Path(conf.hdfsDir).getFileSystem(hadoopConf)
   }
 
   def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: 
Boolean): Unit = {
@@ -71,4 +75,24 @@ object CelebornHadoopUtils extends Logging {
         logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: 
", e)
     }
   }
+
+  def initKerberos(conf: CelebornConf, hadoopConf: Configuration): Unit = {
+    // If we are accessing HDFS and it has Kerberos enabled, we have to login
+    // from a keytab file so that we can access HDFS beyond the kerberos 
ticket expiration.
+    UserGroupInformation.setConfiguration(hadoopConf)
+    if (conf.hdfsStorageKerberosEnabled) {
+      val principal = conf.hdfsStorageKerberosPrincipal
+        .getOrElse(throw new NoSuchElementException(
+          CelebornConf.HDFS_STORAGE_KERBEROS_PRINCIPAL.key))
+      val keytab = conf.hdfsStorageKerberosKeytab
+        .getOrElse(throw new 
NoSuchElementException(CelebornConf.HDFS_STORAGE_KERBEROS_KEYTAB.key))
+      if (!new File(keytab).exists()) {
+        throw new CelebornException(s"Keytab file: ${keytab} does not exist")
+      } else {
+        logInfo("Attempting to login to Kerberos " +
+          s"using principal: ${principal} and keytab: ${keytab}")
+        UserGroupInformation.loginUserFromKeytab(principal, keytab)
+      }
+    }
+  }
 }
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index b83b4b20c..291e56435 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -40,4 +40,7 @@ license: |
 | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker 
unavailable info would be cleared when the retention period is expired | 0.3.1 
| 
 | celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
+| celeborn.storage.hdfs.kerberos.enabled | false | Whether to enable kerberos 
authentication for HDFS storage connection. | 0.3.2 | 
+| celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | Kerberos keytab 
file path for HDFS storage connection. | 0.3.2 | 
+| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | Kerberos 
principal for HDFS storage connection. | 0.3.2 | 
 <!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a356844cb..965bf320c 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,9 @@ license: |
 | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged 
shuffle data. For example, if a reducer's shuffle data is 128M and the data 
will need 16 fetch chunk requests to fetch. | 0.2.0 | 
 | celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
+| celeborn.storage.hdfs.kerberos.enabled | false | Whether to enable kerberos 
authentication for HDFS storage connection. | 0.3.2 | 
+| celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | Kerberos keytab 
file path for HDFS storage connection. | 0.3.2 | 
+| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | Kerberos 
principal for HDFS storage connection. | 0.3.2 | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | If the number of 
active connections on a worker exceeds this configuration value, the worker 
will be marked as high-load in the heartbeat report, and the master will not 
include that node in the response of RequestSlots. | 0.3.1 | 
 | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for 
read buffer per mount point. | 0.3.0 | 
 | celeborn.worker.clean.threads | 64 | Thread number of worker to clean up 
expired shuffle keys. | 0.3.2 | 

Reply via email to