This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5649e37  [SPARK-35493][K8S] make `spark.blockManager.port` fallback 
for `spark.driver.blockManager.port` as same as other cluster managers
5649e37 is described below

commit 5649e373b7301dfd6580b8efdc68d170722bee78
Author: Kent Yao <y...@apache.org>
AuthorDate: Sun May 23 08:07:57 2021 -0700

    [SPARK-35493][K8S] make `spark.blockManager.port` fallback for 
`spark.driver.blockManager.port` as same as other cluster managers
    
    ### What changes were proposed in this pull request?
    
    `spark.blockManager.port` does not work for k8s driver pods now, we should 
make it work as other cluster managers.
    
    ### Why are the changes needed?
    
    `spark.blockManager.port` should be able to work for spark driver pod
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, `spark.blockManager.port` will be respect iff it is present  && 
`spark.driver.blockManager.port` is absent
    
    ### How was this patch tested?
    
    new tests
    
    Closes #32639 from yaooqinn/SPARK-35493.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 96b0548ab6d5fe36833812f7b6424c984f75c6dd)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../deploy/k8s/features/BasicDriverFeatureStep.scala  |  2 +-
 .../k8s/features/BasicDriverFeatureStepSuite.scala    | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index cec8272..7f34f30 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -96,7 +96,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
     val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, 
DEFAULT_DRIVER_PORT)
     val driverBlockManagerPort = conf.sparkConf.getInt(
       DRIVER_BLOCK_MANAGER_PORT.key,
-      DEFAULT_BLOCKMANAGER_PORT
+      conf.sparkConf.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
     )
     val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
     val driverContainer = new ContainerBuilder(pod.container)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 858b4f1..f084322 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -213,6 +213,25 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-35493: make spark.blockManager.port be able to be fallen back to 
in driver pod") {
+    val initPod = SparkPod.initialPod()
+    val sparkConf = new SparkConf()
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(BLOCK_MANAGER_PORT, 1234)
+    val driverConf1 = KubernetesTestConf.createDriverConf(sparkConf)
+    val pod1 = new BasicDriverFeatureStep(driverConf1).configurePod(initPod)
+    val portMap1 =
+      pod1.container.getPorts.asScala.map { cp => (cp.getName -> 
cp.getContainerPort) }.toMap
+    assert(portMap1(BLOCK_MANAGER_PORT_NAME) === 1234, s"fallback to 
$BLOCK_MANAGER_PORT.key")
+
+    val driverConf2 =
+      
KubernetesTestConf.createDriverConf(sparkConf.set(DRIVER_BLOCK_MANAGER_PORT, 
1235))
+    val pod2 = new BasicDriverFeatureStep(driverConf2).configurePod(initPod)
+    val portMap2 =
+      pod2.container.getPorts.asScala.map { cp => (cp.getName -> 
cp.getContainerPort) }.toMap
+    assert(portMap2(BLOCK_MANAGER_PORT_NAME) === 1235)
+  }
+
   def containerPort(name: String, portNumber: Int): ContainerPort =
     new ContainerPortBuilder()
       .withName(name)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to