This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 600a5eeb9 [VL]: Fix VeloxColumnarWriteFilesExecwithNewChildren doesn't
replace the dummy child (#5726)
600a5eeb9 is described below
commit 600a5eeb93cf2cbd12aa2c018d28addf12510bd2
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 13 18:19:46 2024 +0800
[VL]: Fix VeloxColumnarWriteFilesExecwithNewChildren doesn't replace the
dummy child (#5726)
---
.../execution/VeloxColumnarWriteFilesExec.scala | 61 +++++++++++++++-------
1 file changed, 42 insertions(+), 19 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 23dff990c..1d3d55afb 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -259,8 +259,9 @@ class VeloxColumnarWriteFilesRDD(
// we need to expose a dummy child (as right child) with type "WriteFilesExec"
to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write
is the left child
// of this operator.
-case class VeloxColumnarWriteFilesExec(
- child: SparkPlan,
+case class VeloxColumnarWriteFilesExec private (
+ override val left: SparkPlan,
+ override val right: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
@@ -269,7 +270,8 @@ case class VeloxColumnarWriteFilesExec(
extends BinaryExecNode
with GlutenPlan
with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
- import VeloxColumnarWriteFilesExec._
+
+ val child: SparkPlan = left
override lazy val references: AttributeSet = AttributeSet.empty
@@ -320,28 +322,49 @@ case class VeloxColumnarWriteFilesExec(
new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
}
}
-
- override def left: SparkPlan = child
-
- // This is a workaround for FileFormatWriter#write. Vanilla Spark (version
>= 3.4) requires for
- // a plan that has at least one node exactly of type `WriteFilesExec` that
is a Scala case-class,
- // to decide to choose new `#executeWrite` code path over the legacy
`#execute` for write
- // operation.
- //
- // So we add a no-op `WriteFilesExec` child to let Spark pick the new code
path.
- //
- // See: FileFormatWriter#write
- // See: V1Writes#getWriteFilesOpt
- override val right: SparkPlan =
- WriteFilesExec(NoopLeaf(), fileFormat, partitionColumns, bucketSpec,
options, staticPartitions)
-
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
- copy(newLeft, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
+ copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
}
object VeloxColumnarWriteFilesExec {
+
+ def apply(
+ child: SparkPlan,
+ fileFormat: FileFormat,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
+ // This is a workaround for FileFormatWriter#write. Vanilla Spark (version
>= 3.4) requires for
+ // a plan that has at least one node exactly of type `WriteFilesExec` that
is a Scala
+ // case-class, to decide to choose new `#executeWrite` code path over the
legacy `#execute`
+ // for write operation.
+ //
+ // So we add a no-op `WriteFilesExec` child to let Spark pick the new code
path.
+ //
+ // See: FileFormatWriter#write
+ // See: V1Writes#getWriteFilesOpt
+ val right: SparkPlan =
+ WriteFilesExec(
+ NoopLeaf(),
+ fileFormat,
+ partitionColumns,
+ bucketSpec,
+ options,
+ staticPartitions)
+
+ VeloxColumnarWriteFilesExec(
+ child,
+ right,
+ fileFormat,
+ partitionColumns,
+ bucketSpec,
+ options,
+ staticPartitions)
+ }
+
private case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]