This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b0580ef56ca [HUDI-5956] Fix spark DAG ui when write (#11376)
b0580ef56ca is described below
commit b0580ef56ca1699d785af98d564917a7642bf1f6
Author: KnightChess <[email protected]>
AuthorDate: Sat Jun 22 18:44:43 2024 +0800
[HUDI-5956] Fix spark DAG ui when write (#11376)
---
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 8 +++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 64 ++++++++++++++--------
.../apache/spark/sql/adapter/Spark2Adapter.scala | 7 +++
.../spark/sql/adapter/BaseSpark3Adapter.scala | 7 +++
4 files changed, 63 insertions(+), 23 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index a098298fdbb..40f64e41029 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
SparkParquetReader}
import org.apache.spark.sql.internal.SQLConf
@@ -230,4 +231,11 @@ trait SparkAdapter extends Serializable {
sqlConf: SQLConf,
options: Map[String, String],
hadoopConf: Configuration): SparkParquetReader
+
+ /**
+ * use new qe execute
+ */
+ def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+ queryExecution: QueryExecution,
+ name: Option[String] = None)(body: =>
T): T
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 9a23dac7fd5..b07ea629eab 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -59,24 +59,24 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkKeyGenUtils
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.slf4j.LoggerFactory
import java.util.function.BiConsumer
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
@@ -174,32 +174,43 @@ class HoodieSparkSqlWriterInternal {
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
-
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
- var succeeded = false
- var counter = 0
- val maxRetry: Integer =
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
- var toReturn: (Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = null
- while (counter <= maxRetry && !succeeded) {
- try {
- toReturn = writeInternal(sqlContext, mode, optParams, sourceDf,
streamingWritesParamsOpt, hoodieWriteClient)
- if (counter > 0) {
- log.warn(s"Succeeded with attempt no $counter")
- }
- succeeded = true
- } catch {
- case e: HoodieWriteConflictException =>
- val writeConcurrencyMode =
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
- if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode)
&& counter < maxRetry) {
- counter += 1
- log.warn(s"Conflict found. Retrying again for attempt no $counter")
- } else {
- throw e
+ val retryWrite: () => (Boolean, HOption[String], HOption[String],
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = () => {
+ var succeeded = false
+ var counter = 0
+ val maxRetry: Integer =
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
+ var toReturn: (Boolean, HOption[String], HOption[String],
HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = null
+
+ while (counter <= maxRetry && !succeeded) {
+ try {
+ toReturn = writeInternal(sqlContext, mode, optParams, sourceDf,
streamingWritesParamsOpt, hoodieWriteClient)
+ if (counter > 0) {
+ log.warn(s"Succeeded with attempt no $counter")
}
+ succeeded = true
+ } catch {
+ case e: HoodieWriteConflictException =>
+ val writeConcurrencyMode =
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+ if (WriteConcurrencyMode.supportsMultiWriter(writeConcurrencyMode)
&& counter < maxRetry) {
+ counter += 1
+ log.warn(s"Conflict found. Retrying again for attempt no
$counter")
+ } else {
+ throw e
+ }
+ }
}
+ toReturn
+ }
+
+ val executionId = getExecutionId(sqlContext.sparkContext,
sourceDf.queryExecution)
+ if (executionId.isEmpty) {
+ sparkAdapter.sqlExecutionWithNewExecutionId(sourceDf.sparkSession,
sourceDf.queryExecution, Option("Hudi Command"))(
+ retryWrite.apply()
+ )
+ } else {
+ retryWrite.apply()
}
- toReturn
}
private def writeInternal(sqlContext: SQLContext,
@@ -1116,4 +1127,11 @@ class HoodieSparkSqlWriterInternal {
Map.empty
}
}
+
+ private def getExecutionId(context: SparkContext, newQueryExecution:
QueryExecution): Option[Long] = {
+ // If the `QueryExecution` does not match the current execution ID, it
means the execution ID
+ // belongs to another plan job, so we couldn't update UI in this plan job.
+ Option(context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
+ .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq
newQueryExecution)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 767fbf4c47b..3312bf06efb 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24LegacyHoodieParquetFileFormat, Spark24ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -224,4 +225,10 @@ class Spark2Adapter extends SparkAdapter {
hadoopConf: Configuration):
SparkParquetReader = {
Spark24ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
}
+
+ override def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+ queryExecution:
QueryExecution,
+ name: Option[String])(body:
=> T): T = {
+ SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 44ae9a5b49c..5b7106123f7 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredicate, Predicate}
import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -100,4 +101,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int):
ColumnarBatch = {
new ColumnarBatch(vectors, numRows)
}
+
+ override def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+ queryExecution:
QueryExecution,
+ name: Option[String])(body:
=> T): T = {
+ SQLExecution.withNewExecutionId(queryExecution, name)(body)
+ }
}