This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 1603bdfda4 Modify the SparkMeasure feature to be disabled by default.
(#5274)
1603bdfda4 is described below
commit 1603bdfda43768e2399d67a7cdf356f90ef77ff5
Author: LiuGuoHua <[email protected]>
AuthorDate: Tue Oct 28 10:31:41 2025 +0800
Modify the SparkMeasure feature to be disabled by default. (#5274)
* Modify the SparkMeasure feature to be disabled by default.
* format code
---
.../spark/executor/SparkSqlExecutor.scala | 36 ++++++++++++----------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
index 193fd5516c..17f13c7803 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
@@ -160,23 +160,25 @@ class SparkSqlExecutor(
sparkEngineSession: SparkEngineSession,
code: String
): Option[SparkSqlMeasure] = {
- val sparkMeasureType = engineExecutionContext.getProperties
- .getOrDefault(SparkConfiguration.SPARKMEASURE_AGGREGATE_TYPE, "")
- .toString
-
- if (sparkMeasureType.nonEmpty) {
- val outputPrefix =
SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(options)
- val outputPath = FsPath.getFsPath(
- outputPrefix,
-
LabelUtil.getUserCreator(engineExecutionContext.getLabels.toList.asJava)._1,
- sparkMeasureType,
- JobUtils.getJobIdFromMap(engineExecutionContext.getProperties),
- new Date().getTime.toString
- )
- Some(new SparkSqlMeasure(sparkEngineSession.sparkSession, code,
sparkMeasureType, outputPath))
- } else {
- None
- }
+
Option(engineExecutionContext.getProperties.get(SparkConfiguration.SPARKMEASURE_AGGREGATE_TYPE))
+ .map(_.toString)
+ .flatMap { sparkMeasureType =>
+ val userName =
LabelUtil.getUserCreator(engineExecutionContext.getLabels.toList.asJava)._1
+ val outputPrefix =
SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(options)
+ val timestamp = System.currentTimeMillis().toString
+
+ val outputPath = FsPath.getFsPath(
+ outputPrefix,
+ userName,
+ sparkMeasureType,
+ JobUtils.getJobIdFromMap(engineExecutionContext.getProperties),
+ timestamp
+ )
+
+ Some(
+ new SparkSqlMeasure(sparkEngineSession.sparkSession, code,
sparkMeasureType, outputPath)
+ )
+ }
}
override protected def getExecutorIdPreFix: String = "SparkSqlExecutor_"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]