This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 033b2a80db5d882177745244f5bb6a237bb4a9b7
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 21:12:09 2023 +0800

    Spark supports printing parameters to task logs
---
 .../spark/executor/SparkEngineConnExecutor.scala   | 42 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 8d97e8152..4a14cd39c 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -100,9 +100,14 @@ abstract class SparkEngineConnExecutor(val sc: 
SparkContext, id: Long)
     }
     val kind: Kind = getKind
     var preCode = code
-    engineExecutorContext.appendStdout(
-      LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
-    )
+
+    val isFirstParagraph = (engineExecutorContext.getCurrentParagraph == 1)
+    if (isFirstParagraph == true) {
+      engineExecutorContext.appendStdout(
+        LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
+      )
+    }
+
     // Pre-execution hook
     var executionHook: SparkPreExecutionHook = null
     Utils.tryCatch {
@@ -138,6 +143,37 @@ abstract class SparkEngineConnExecutor(val sc: 
SparkContext, id: Long)
     logger.info("Set jobGroup to " + jobGroup)
     sc.setJobGroup(jobGroup, _code, true)
 
+    // print job configuration, only the first paragraph
+    if (isFirstParagraph == true) {
+      Utils.tryCatch({
+        val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
+        val executorMem: Long =
+          ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.executor.memory"))
+        val driverMem: Long = 
ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.driver.memory"))
+        val sparkExecutorCores = sc.getConf.get("spark.executor.cores", 
"2").toInt
+        val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
+        val queue = sc.getConf.get("spark.yarn.queue")
+        // with unit if set configuration with unit
+        // if not set sc get will get the value of 
spark.yarn.executor.memoryOverhead such as 512(without unit)
+        val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", 
"1G")
+
+        val sb = new StringBuilder
+        sb.append(s"spark.executor.instances=$executorNum\n")
+        sb.append(s"spark.executor.memory=${executorMem}G\n")
+        sb.append(s"spark.driver.memory=${driverMem}G\n")
+        sb.append(s"spark.executor.cores=$sparkExecutorCores\n")
+        sb.append(s"spark.driver.cores=$sparkDriverCores\n")
+        sb.append(s"spark.yarn.queue=$queue\n")
+        sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n")
+        sb.append("\n")
+        engineExecutionContext.appendStdout(
+          LogUtils.generateInfo(s" Your spark job exec with 
configs:\n${sb.toString()}")
+        )
+      })(t => {
+        logger.warn("Get actual used resource exception", t)
+      })
+    }
+
     val response = Utils.tryFinally(runCode(this, _code, 
engineExecutorContext, jobGroup)) {
       // Utils.tryAndWarn(this.engineExecutionContext.pushProgress(1, 
getProgressInfo("")))
       jobGroup = null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to