This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b8b6d17ad8e4 [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write b8b6d17ad8e4 is described below commit b8b6d17ad8e472307fb4c03ca388efcc4ac7059e Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Fri Apr 26 18:32:18 2024 +0800 [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write ### What changes were proposed in this pull request? This pr adds a new trait `WriteFilesExecBase` for v1 write, so that the downstream project can inherit `WriteFilesExecBase` rather than `WriteFilesExec`. The reason is that, inherit a `case class` is a bad practice in scala world. ### Why are the changes needed? Make downstream project easy to develop. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass CI ### Was this patch authored or co-authored using generative AI tooling? no Closes #46240 from ulysses-you/WriteFilesExecBase. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/execution/datasources/V1Writes.scala | 4 ++-- .../spark/sql/execution/datasources/WriteFiles.scala | 16 +++++++++------- .../apache/spark/sql/SparkSessionExtensionSuite.scala | 13 ++++++------- .../sql/execution/datasources/V1WriteCommandSuite.scala | 8 ++++---- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index d7a8d7aec0b7..1d6c2a6f8112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -213,9 +213,9 @@ object V1WritesUtils { } } - def getWriteFilesOpt(child: SparkPlan): Option[WriteFilesExec] = { + def getWriteFilesOpt(child: SparkPlan): Option[WriteFilesExecBase] = { child.collectFirst { - case w: WriteFilesExec => w + case w: WriteFilesExecBase => w } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala index a4fd57e7dffa..c6c34b7fcea3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala @@ -58,6 +58,14 @@ case class WriteFiles( copy(child = newChild) } +trait WriteFilesExecBase extends UnaryExecNode { + override def output: Seq[Attribute] = Seq.empty + + override protected def doExecute(): RDD[InternalRow] = { + throw SparkException.internalError(s"$nodeName does not support doExecute") + } +} + /** * Responsible for writing files. */ @@ -67,9 +75,7 @@ case class WriteFilesExec( partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec) extends UnaryExecNode { - override def output: Seq[Attribute] = Seq.empty - + staticPartitions: TablePartitionSpec) extends WriteFilesExecBase { override protected def doExecuteWrite( writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { val rdd = child.execute() @@ -105,10 +111,6 @@ case class WriteFilesExec( } } - override protected def doExecute(): RDD[InternalRow] = { - throw SparkException.internalError(s"$nodeName does not support doExecute") - } - override protected def stringArgs: Iterator[Any] = Iterator(child) override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 1c44d0c3b4ea..4d38e360f438 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesSpec} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesExecBase, WriteFilesSpec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf @@ -842,14 +842,13 @@ class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) new ColumnarProjectExec(projectList, newChild) } -class ColumnarWriteExec( +case class ColumnarWriteExec( child: SparkPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec) extends WriteFilesExec( - child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) { + staticPartitions: TablePartitionSpec) extends WriteFilesExecBase { override def supportsColumnar: Boolean = true @@ -858,8 +857,8 @@ class ColumnarWriteExec( throw new Exception("columnar write") } - override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = - new ColumnarWriteExec( + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarWriteExec = + ColumnarWriteExec( newChild, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } @@ -971,7 +970,7 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] { replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]), replaceWithColumnarPlan(plan.child)) case write: WriteFilesExec => - new ColumnarWriteExec( + ColumnarWriteExec( replaceWithColumnarPlan(write.child), write.fileFormat, write.partitionColumns, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index ce43edb79c12..04a7b4834f4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -214,8 +214,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[WriteFilesExec]) - executedPlan.asInstanceOf[WriteFilesExec].child + assert(executedPlan.isInstanceOf[WriteFilesExecBase]) + executedPlan.asInstanceOf[WriteFilesExecBase].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan @@ -261,8 +261,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[WriteFilesExec]) - executedPlan.asInstanceOf[WriteFilesExec].child + assert(executedPlan.isInstanceOf[WriteFilesExecBase]) + executedPlan.asInstanceOf[WriteFilesExecBase].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org