This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7bc96e8e3767 [SPARK-45867][CORE] Support `spark.worker.idPattern`
7bc96e8e3767 is described below
commit 7bc96e8e37672483a07088dbbdcf3610a497af1d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 9 13:59:44 2023 -0800
[SPARK-45867][CORE] Support `spark.worker.idPattern`
### What changes were proposed in this pull request?
This PR aims to support `spark.worker.idPattern`.
### Why are the changes needed?
To allow users to customize the worker IDs if they want.
- From: `worker-20231109183042-[fe80::1%lo0]-39729`
- To: `my-worker-20231109183042-[fe80::1%lo0]`
For example,
```
$ cat conf/spark-defaults.conf
spark.worker.idPattern worker-%2$s
```
<img width="669" alt="Screenshot 2023-11-09 at 1 25 19 PM"
src="https://github.com/apache/spark/assets/9700541/bc367e77-c19d-44f1-bbc5-3f4c5edec33d">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43740 from dongjoon-hyun/SPARK-45867.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/worker/Worker.scala | 3 ++-
.../org/apache/spark/internal/config/Worker.scala | 11 +++++++++++
.../org/apache/spark/deploy/worker/WorkerSuite.scala | 18 ++++++++++++++++--
3 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 44082ae78794..ddbba55e00b4 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,6 +62,7 @@ private[deploy] class Worker(
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port
+ private val workerIdPattern = conf.get(config.Worker.WORKER_ID_PATTERN)
Utils.checkHost(host)
assert (port > 0)
@@ -813,7 +814,7 @@ private[deploy] class Worker(
}
private def generateWorkerId(): String = {
- "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
+ workerIdPattern.format(createDateFormat.format(new Date), host, port)
}
override def onStop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index fda3a57546b6..f160470edd8f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -89,4 +89,15 @@ private[spark] object Worker {
.version("3.2.0")
.stringConf
.createWithDefaultString("PWR")
+
+ val WORKER_ID_PATTERN = ConfigBuilder("spark.worker.idPattern")
+ .internal()
+ .doc("The pattern for worker ID generation based on Java `String.format`
method. The " +
+ "default value is `worker-%s-%s-%d` which represents the existing worker
id string, e.g.," +
+ " `worker-20231109183042-[fe80::1%lo0]-39729`. Please be careful to
generate unique IDs")
+ .version("4.0.0")
+ .stringConf
+ .checkValue(!_.format("20231109000000", "host", 0).exists(_.isWhitespace),
+ "Whitespace is not allowed.")
+ .createWithDefaultString("worker-%s-%s-%d")
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index a07d4f76905a..1b2d92af4b02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -29,7 +29,7 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
@@ -49,7 +49,7 @@ import
org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID}
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.util.Utils
-class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with
PrivateMethodTester {
import org.apache.spark.deploy.DeployTestUtils._
@@ -62,6 +62,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
implicit val formats = DefaultFormats
+ private val _generateWorkerId =
PrivateMethod[String](Symbol("generateWorkerId"))
+
private var _worker: Worker = _
private def makeWorker(
@@ -391,4 +393,16 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
assert(cleanupCalled.get() == dbCleanupEnabled)
}
}
+
+ test("SPARK-45867: Support worker id pattern") {
+ val worker = makeWorker(new SparkConf().set(WORKER_ID_PATTERN,
"my-worker-%2$s"))
+ assert(worker.invokePrivate(_generateWorkerId()) === "my-worker-localhost")
+ }
+
+ test("SPARK-45867: Prevent invalid worker id patterns") {
+ val m = intercept[IllegalArgumentException] {
+ makeWorker(new SparkConf().set(WORKER_ID_PATTERN, "my worker"))
+ }.getMessage
+ assert(m.contains("Whitespace is not allowed"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]