This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b0580ef56ca [HUDI-5956] Fix spark DAG ui when write (#11376)
b0580ef56ca is described below

commit b0580ef56ca1699d785af98d564917a7642bf1f6
Author: KnightChess <[email protected]>
AuthorDate: Sat Jun 22 18:44:43 2024 +0800

    [HUDI-5956] Fix spark DAG ui when write (#11376)
---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  8 +++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 64 ++++++++++++++--------
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  7 +++
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |  7 +++
 4 files changed, 63 insertions(+), 23 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index a098298fdbb..40f64e41029 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
SparkParquetReader}
 import org.apache.spark.sql.internal.SQLConf
@@ -230,4 +231,11 @@ trait SparkAdapter extends Serializable {
                               sqlConf: SQLConf,
                               options: Map[String, String],
                               hadoopConf: Configuration): SparkParquetReader
+
+  /**
+   * use new qe execute
+   */
+  def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+                                        queryExecution: QueryExecution,
+                                        name: Option[String] = None)(body: => 
T): T
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 9a23dac7fd5..b07ea629eab 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -59,24 +59,24 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.util.SparkKeyGenUtils
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.slf4j.LoggerFactory
 
 import java.util.function.BiConsumer
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
@@ -174,32 +174,43 @@ class HoodieSparkSqlWriterInternal {
             sourceDf: DataFrame,
             streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
             hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
-
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
-    var succeeded = false
-    var counter = 0
-    val maxRetry: Integer = 
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
 HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
-    var toReturn: (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = null
 
-    while (counter <= maxRetry && !succeeded) {
-      try {
-        toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, 
streamingWritesParamsOpt, hoodieWriteClient)
-        if (counter > 0) {
-          log.warn(s"Succeeded with attempt no $counter")
-        }
-        succeeded = true
-      } catch {
-        case e: HoodieWriteConflictException =>
-          val writeConcurrencyMode = 
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
-          if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode) 
&& counter < maxRetry) {
-            counter += 1
-            log.warn(s"Conflict found. Retrying again for attempt no $counter")
-          } else {
-            throw e
+    val retryWrite: () => (Boolean, HOption[String], HOption[String], 
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = () => {
+      var succeeded = false
+      var counter = 0
+      val maxRetry: Integer = 
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
 HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
+      var toReturn: (Boolean, HOption[String], HOption[String], 
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = null
+
+      while (counter <= maxRetry && !succeeded) {
+        try {
+          toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, 
streamingWritesParamsOpt, hoodieWriteClient)
+          if (counter > 0) {
+            log.warn(s"Succeeded with attempt no $counter")
           }
+          succeeded = true
+        } catch {
+          case e: HoodieWriteConflictException =>
+            val writeConcurrencyMode = 
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+            if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode) 
&& counter < maxRetry) {
+              counter += 1
+              log.warn(s"Conflict found. Retrying again for attempt no 
$counter")
+            } else {
+              throw e
+            }
+        }
       }
+      toReturn
+    }
+
+    val executionId = getExecutionId(sqlContext.sparkContext, 
sourceDf.queryExecution)
+    if (executionId.isEmpty) {
+      sparkAdapter.sqlExecutionWithNewExecutionId(sourceDf.sparkSession, 
sourceDf.queryExecution, Option("Hudi Command"))(
+        retryWrite.apply()
+      )
+    } else {
+      retryWrite.apply()
     }
-    toReturn
   }
 
   private def writeInternal(sqlContext: SQLContext,
@@ -1116,4 +1127,11 @@ class HoodieSparkSqlWriterInternal {
       Map.empty
     }
   }
+
+  private def getExecutionId(context: SparkContext, newQueryExecution: 
QueryExecution): Option[Long] = {
+    // If the `QueryExecution` does not match the current execution ID, it 
means the execution ID
+    // belongs to another plan job, so we couldn't update UI in this plan job.
+    Option(context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
+      .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq 
newQueryExecution)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 767fbf4c47b..3312bf06efb 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
 import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24LegacyHoodieParquetFileFormat, Spark24ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -224,4 +225,10 @@ class Spark2Adapter extends SparkAdapter {
                                        hadoopConf: Configuration): 
SparkParquetReader = {
     Spark24ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
   }
+
+  override def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+                                                 queryExecution: 
QueryExecution,
+                                                 name: Option[String])(body: 
=> T): T = {
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 44ae9a5b49c..5b7106123f7 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
InterpretedPredicate, Predicate}
 import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -100,4 +101,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
   override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): 
ColumnarBatch = {
     new ColumnarBatch(vectors, numRows)
   }
+
+  override def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+                                                 queryExecution: 
QueryExecution,
+                                                 name: Option[String])(body: 
=> T): T = {
+      SQLExecution.withNewExecutionId(queryExecution, name)(body)
+  }
 }

Reply via email to