This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bc96e8e3767 [SPARK-45867][CORE] Support `spark.worker.idPattern`
7bc96e8e3767 is described below

commit 7bc96e8e37672483a07088dbbdcf3610a497af1d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 9 13:59:44 2023 -0800

    [SPARK-45867][CORE] Support `spark.worker.idPattern`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `spark.worker.idPattern`.
    
    ### Why are the changes needed?
    
    To allow users to customize the worker IDs if they want.
    - From: `worker-20231109183042-[fe80::1%lo0]-39729`
    - To: `my-worker-20231109183042-[fe80::1%lo0]`
    
    For example,
    ```
    $ cat conf/spark-defaults.conf
    spark.worker.idPattern worker-%2$s
    ```
    
    <img width="669" alt="Screenshot 2023-11-09 at 1 25 19 PM" 
src="https://github.com/apache/spark/assets/9700541/bc367e77-c19d-44f1-bbc5-3f4c5edec33d";>
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43740 from dongjoon-hyun/SPARK-45867.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/worker/Worker.scala  |  3 ++-
 .../org/apache/spark/internal/config/Worker.scala      | 11 +++++++++++
 .../org/apache/spark/deploy/worker/WorkerSuite.scala   | 18 ++++++++++++++++--
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 44082ae78794..ddbba55e00b4 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,6 +62,7 @@ private[deploy] class Worker(
 
   private val host = rpcEnv.address.host
   private val port = rpcEnv.address.port
+  private val workerIdPattern = conf.get(config.Worker.WORKER_ID_PATTERN)
 
   Utils.checkHost(host)
   assert (port > 0)
@@ -813,7 +814,7 @@ private[deploy] class Worker(
   }
 
   private def generateWorkerId(): String = {
-    "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
+    workerIdPattern.format(createDateFormat.format(new Date), host, port)
   }
 
   override def onStop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index fda3a57546b6..f160470edd8f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -89,4 +89,15 @@ private[spark] object Worker {
       .version("3.2.0")
       .stringConf
       .createWithDefaultString("PWR")
+
+  val WORKER_ID_PATTERN = ConfigBuilder("spark.worker.idPattern")
+    .internal()
+    .doc("The pattern for worker ID generation based on Java `String.format` 
method. The " +
+      "default value is `worker-%s-%s-%d` which represents the existing worker 
id string, e.g.," +
+      " `worker-20231109183042-[fe80::1%lo0]-39729`. Please be careful to 
generate unique IDs")
+    .version("4.0.0")
+    .stringConf
+    .checkValue(!_.format("20231109000000", "host", 0).exists(_.isWhitespace),
+      "Whitespace is not allowed.")
+    .createWithDefaultString("worker-%s-%s-%d")
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index a07d4f76905a..1b2d92af4b02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -29,7 +29,7 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
@@ -49,7 +49,7 @@ import 
org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID}
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.util.Utils
 
-class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
@@ -62,6 +62,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
 
   implicit val formats = DefaultFormats
 
+  private val _generateWorkerId = 
PrivateMethod[String](Symbol("generateWorkerId"))
+
   private var _worker: Worker = _
 
   private def makeWorker(
@@ -391,4 +393,16 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
       assert(cleanupCalled.get() == dbCleanupEnabled)
     }
   }
+
+  test("SPARK-45867: Support worker id pattern") {
+    val worker = makeWorker(new SparkConf().set(WORKER_ID_PATTERN, 
"my-worker-%2$s"))
+    assert(worker.invokePrivate(_generateWorkerId()) === "my-worker-localhost")
+  }
+
+  test("SPARK-45867: Prevent invalid worker id patterns") {
+    val m = intercept[IllegalArgumentException] {
+      makeWorker(new SparkConf().set(WORKER_ID_PATTERN, "my worker"))
+    }.getMessage
+    assert(m.contains("Whitespace is not allowed"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to