ChengJie1053 commented on code in PR #3952:
URL:
https://github.com/apache/incubator-streampark/pull/3952#discussion_r1712932880
##########
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala:
##########
@@ -119,24 +119,40 @@ object YarnClient extends SparkClientTrait {
.setDeployMode(submitRequest.executionMode match {
case SparkExecutionMode.YARN_CLIENT => "client"
case SparkExecutionMode.YARN_CLUSTER => "cluster"
- case _ => throw new IllegalArgumentException("[StreamPark][Spark]
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
+ case _ =>
+ throw new IllegalArgumentException("[StreamPark][Spark][YarnClient]
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
})
}
private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher:
SparkLauncher): Unit = {
logger.info("[StreamPark][Spark][YarnClient] set spark configuration.")
- // 1) set spark conf
- submitRequest.properties.foreach(prop => {
+ // 1) put yarn queue
+ if (SparkExecutionMode.isYarnMode(submitRequest.executionMode)) {
+ setYarnQueue(submitRequest)
+ }
+
+ // 2) set spark conf
+ submitRequest.appProperties.foreach(prop => {
val k = prop._1
val v = prop._2
logInfo(s"| $k : $v")
sparkLauncher.setConf(k, v)
})
- // 2) appArgs...
+ // 3) set spark args
+ submitRequest.appArgs.foreach(sparkLauncher.addAppArgs(_))
if (submitRequest.hasExtra("sql")) {
sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString)
}
}
+ protected def setYarnQueue(submitRequest: SubmitRequest): Unit = {
+ if (submitRequest.hasExtra("yarnQueueName")) {
+ submitRequest.appProperties.put("spark.yarn.queue",
submitRequest.getExtra("yarnQueueName").asInstanceOf[String])
+ }
+ if (submitRequest.hasExtra("yarnQueueLabel")) {
+ submitRequest.appProperties.put("spark.yarn.am.nodeLabelExpression",
submitRequest.getExtra("yarnQueueLabel").asInstanceOf[String])
+
submitRequest.appProperties.put("spark.yarn.executor.nodeLabelExpression",
submitRequest.getExtra("yarnQueueLabel").asInstanceOf[String])
Review Comment:
Whether yarnQueueName and yarnQueueLabel should be extracted as constants
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]