This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f64eb3a4f [GLUTEN-6705]][CORE][VL][CH] [Part-2] Rework
CumnarWriteFilesExec (#6761)
f64eb3a4f is described below
commit f64eb3a4f4a49720f917a41ec5349e54fb82c31a
Author: Chang chen <[email protected]>
AuthorDate: Fri Aug 9 09:29:33 2024 +0800
[GLUTEN-6705]][CORE][VL][CH] [Part-2] Rework CumnarWriteFilesExec (#6761)
---
.../clickhouse/CHSparkPlanExecApi.scala | 22 +-
.../sql/execution/CHColumnarWriteFilesExec.scala | 201 ++++++----------
.../sql/execution/ClickhouseBackendWrite.scala | 84 -------
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 23 +-
.../spark/sql/execution/VeloxBackendWrite.scala | 138 -----------
.../execution/VeloxColumnarWriteFilesExec.scala | 253 ++++++++++-----------
.../gluten/backendsapi/SparkPlanExecApi.scala | 18 +-
.../extension/columnar/OffloadSingleNode.scala | 3 +-
.../sql/execution/ColumnarWriteFilesExec.scala | 157 +------------
9 files changed, 238 insertions(+), 661 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index bba5525ed..8254a1e14 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -38,6 +38,8 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters,
GlutenShuffleWriter
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -49,7 +51,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, WriteJobDescription}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
@@ -682,8 +684,22 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHRegExpReplaceTransformer(substraitExprName, children, expr)
}
- def createBackendWrite(description: WriteJobDescription): BackendWrite =
ClickhouseBackendWrite(
- description)
+ override def createColumnarWriteFilesExec(
+ child: SparkPlan,
+ noop: SparkPlan,
+ fileFormat: FileFormat,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec =
+ CHColumnarWriteFilesExec(
+ child,
+ noop,
+ fileFormat,
+ partitionColumns,
+ bucketSpec,
+ options,
+ staticPartitions)
override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
similarity index 57%
copy from
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
copy to
backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
index b809a4b65..52019d770 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
@@ -17,21 +17,15 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
-import
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType,
RowType}
-import org.apache.gluten.extension.columnar.transition.ConventionReq
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
@@ -42,25 +36,73 @@ import org.apache.hadoop.fs.FileAlreadyExistsException
import java.util.Date
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write
path before
- * initializing the native plan and collect native write files metrics for
each backend.
- */
-trait BackendWrite {
- def collectNativeWriteFilesMetrics(batch: ColumnarBatch):
Option[WriteTaskResult]
-}
+import scala.collection.mutable
/**
* This RDD is used to make sure we have injected staging write path before
initializing the native
* plan, and support Spark file commit protocol.
*/
-class ColumnarWriteFilesRDD(
+class CHColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
description: WriteJobDescription,
committer: FileCommitProtocol,
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {
+ private def collectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult]
= {
+ val numFiles = cb.numRows()
+ // Write an empty iterator
+ if (numFiles == 0) {
+ None
+ } else {
+ val file_col = cb.column(0)
+ val partition_col = cb.column(1)
+ val count_col = cb.column(2)
+
+ val outputPath = description.path
+ var updatedPartitions = Set.empty[String]
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
+
+ val write_stats = Range(0, cb.numRows()).map {
+ i =>
+ val targetFileName = file_col.getUTF8String(i).toString
+ val partition = partition_col.getUTF8String(i).toString
+ if (partition != "__NO_PARTITION_ID__") {
+ updatedPartitions += partition
+ val tmpOutputPath = outputPath + "/" + partition + "/" +
targetFileName
+ val customOutputPath =
+ description.customPartitionLocations.get(
+ PartitioningUtils.parsePathFragment(partition))
+ if (customOutputPath.isDefined) {
+ addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
+ }
+ }
+ count_col.getLong(i)
+ }
+
+ val partitionsInternalRows = updatedPartitions.map {
+ part =>
+ val parts = new Array[Any](1)
+ parts(0) = part
+ new GenericInternalRow(parts)
+ }.toSeq
+
+ val numWrittenRows = write_stats.sum
+ val stats = BasicWriteTaskStats(
+ partitions = partitionsInternalRows,
+ numFiles = numFiles,
+ numBytes = 101,
+ numRows = numWrittenRows)
+ val summary =
+ ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
+
+ Some(
+ WriteTaskResult(
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+ summary))
+ }
+ }
+
private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
val stats =
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -94,8 +136,6 @@ class ColumnarWriteFilesRDD(
override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
- val backendWrite =
-
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
@@ -112,7 +152,7 @@ class ColumnarWriteFilesRDD(
assert(iter.hasNext)
val resultColumnarBatch = iter.next()
assert(resultColumnarBatch != null)
- val nativeWriteTaskResult =
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
+ val nativeWriteTaskResult = collectNativeResult(resultColumnarBatch)
if (nativeWriteTaskResult.isEmpty) {
// If we are writing an empty iterator, then velox would do nothing.
// Here we fallback to use vanilla Spark write files to generate an
empty file for
@@ -155,58 +195,20 @@ class ColumnarWriteFilesRDD(
}
}
-// The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
-// we need to expose a dummy child (as right child) with type "WriteFilesExec"
to let Spark
-// choose the new write code path (version >= 3.4). The actual plan to write
is the left child
-// of this operator.
-case class ColumnarWriteFilesExec private (
+case class CHColumnarWriteFilesExec(
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
- staticPartitions: TablePartitionSpec)
- extends BinaryExecNode
- with GlutenPlan
- with ColumnarWriteFilesExec.ExecuteWriteCompatible {
-
- val child: SparkPlan = left
+ staticPartitions: TablePartitionSpec
+) extends ColumnarWriteFilesExec(left, right) {
- override lazy val references: AttributeSet = AttributeSet.empty
-
- override def supportsColumnar(): Boolean = true
-
- override def output: Seq[Attribute] = Seq.empty
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new GlutenException(s"$nodeName does not support doExecute")
- }
-
- /** Fallback to use vanilla Spark write files to generate an empty file for
metadata only. */
- private def writeFilesForEmptyRDD(
- description: WriteJobDescription,
- committer: FileCommitProtocol,
- jobTrackerID: String): RDD[WriterCommitMessage] = {
- val rddWithNonEmptyPartitions =
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
- rddWithNonEmptyPartitions.mapPartitionsInternal {
- iterator =>
- val sparkStageId = TaskContext.get().stageId()
- val sparkPartitionId = TaskContext.get().partitionId()
- val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
-
- val ret = SparkShimLoader.getSparkShims.writeFilesExecuteTask(
- description,
- jobTrackerID,
- sparkStageId,
- sparkPartitionId,
- sparkAttemptNumber,
- committer,
- iterator
- )
- Iterator(ret)
- }
- }
+ override protected def withNewChildrenInternal(
+ newLeft: SparkPlan,
+ newRight: SparkPlan): SparkPlan =
+ copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
override def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
assert(child.supportsColumnar)
@@ -220,76 +222,7 @@ case class ColumnarWriteFilesExec private (
// partition rdd to make sure we at least set up one write task to write
the metadata.
writeFilesForEmptyRDD(description, committer, jobTrackerID)
} else {
- new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
- }
- }
- override protected def withNewChildrenInternal(
- newLeft: SparkPlan,
- newRight: SparkPlan): SparkPlan =
- copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
-}
-
-object ColumnarWriteFilesExec {
-
- def apply(
- child: SparkPlan,
- fileFormat: FileFormat,
- partitionColumns: Seq[Attribute],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String],
- staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
- // This is a workaround for FileFormatWriter#write. Vanilla Spark (version
>= 3.4) requires for
- // a plan that has at least one node exactly of type `WriteFilesExec` that
is a Scala
- // case-class, to decide to choose new `#executeWrite` code path over the
legacy `#execute`
- // for write operation.
- //
- // So we add a no-op `WriteFilesExec` child to let Spark pick the new code
path.
- //
- // See: FileFormatWriter#write
- // See: V1Writes#getWriteFilesOpt
- val right: SparkPlan =
- WriteFilesExec(
- NoopLeaf(),
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
-
- ColumnarWriteFilesExec(
- child,
- right,
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
- }
-
- private case class NoopLeaf() extends LeafExecNode {
- override protected def doExecute(): RDD[InternalRow] =
- throw new GlutenException(s"$nodeName does not support doExecute")
- override def output: Seq[Attribute] = Seq.empty
- }
-
- /**
- * ColumnarWriteFilesExec neither output Row nor columnar data. We output
both row and columnar to
- * avoid c2r and r2c transitions. Please note, [[GlutenPlan]] already
implement batchType()
- */
- sealed trait ExecuteWriteCompatible extends KnownChildrenConventions with
KnownRowType {
- // To be compatible with Spark (version < 3.4)
- protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
- throw new GlutenException(
- s"Internal Error ${this.getClass} has write support" +
- s" mismatch:\n${this}")
- }
-
- override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(ConventionReq.backendBatch)
- }
-
- override def rowType(): RowType = {
- RowType.VanillaRow
+ new CHColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
}
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
deleted file mode 100644
index 225d9688c..000000000
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import scala.collection.mutable
-
-case class ClickhouseBackendWrite(description: WriteJobDescription)
- extends BackendWrite
- with Logging {
-
- override def collectNativeWriteFilesMetrics(cb: ColumnarBatch):
Option[WriteTaskResult] = {
- val numFiles = cb.numRows()
- // Write an empty iterator
- if (numFiles == 0) {
- None
- } else {
- val file_col = cb.column(0)
- val partition_col = cb.column(1)
- val count_col = cb.column(2)
-
- val outputPath = description.path
- var updatedPartitions = Set.empty[String]
- val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
-
- val write_stats = Range(0, cb.numRows()).map {
- i =>
- val targetFileName = file_col.getUTF8String(i).toString
- val partition = partition_col.getUTF8String(i).toString
- if (partition != "__NO_PARTITION_ID__") {
- updatedPartitions += partition
- val tmpOutputPath = outputPath + "/" + partition + "/" +
targetFileName
- val customOutputPath =
- description.customPartitionLocations.get(
- PartitioningUtils.parsePathFragment(partition))
- if (customOutputPath.isDefined) {
- addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
- }
- }
- count_col.getLong(i)
- }
-
- val partitionsInternalRows = updatedPartitions.map {
- part =>
- val parts = new Array[Any](1)
- parts(0) = part
- new GenericInternalRow(parts)
- }.toSeq
-
- val numWrittenRows = write_stats.sum
- val stats = BasicWriteTaskStats(
- partitions = partitionsInternalRows,
- numFiles = numFiles,
- numBytes = 101,
- numRows = numWrittenRows)
- val summary =
- ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
-
- Some(
- WriteTaskResult(
- new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
- summary))
- }
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 9e0d85e25..c1c5332d7 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -39,6 +39,8 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -48,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.WriteJobDescription
+import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -555,9 +557,22 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
parameters: GenShuffleWriterParameters[K, V]):
GlutenShuffleWriterWrapper[K, V] = {
ShuffleUtil.genColumnarShuffleWriter(parameters)
}
-
- override def createBackendWrite(description: WriteJobDescription):
BackendWrite = {
- VeloxBackendWrite(description)
+ override def createColumnarWriteFilesExec(
+ child: SparkPlan,
+ noop: SparkPlan,
+ fileFormat: FileFormat,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
+ VeloxColumnarWriteFilesExec(
+ child,
+ noop,
+ fileFormat,
+ partitionColumns,
+ bucketSpec,
+ options,
+ staticPartitions)
}
override def createColumnarArrowEvalPythonExec(
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
deleted file mode 100644
index 5d47aff04..000000000
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import scala.collection.mutable
-
-// Velox write files metrics start
-//
-// Follows the code in velox `HiveDataSink::close()`
-// The json can be as following:
-// {
-// "inMemoryDataSizeInBytes":0,
-// "containsNumberedFileNames":true,
-// "onDiskDataSizeInBytes":307,
-// "fileWriteInfos":[
-// {
-// "fileSize":307,
-// "writeFileName":
-//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
-// "targetFileName":
-//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
-// }
-// ],
-//
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-// "rowCount":1,
-//
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-// "updateMode":"NEW",
-// "name":"part1=1/part2=1"
-// }
-case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String,
fileSize: Long)
-
-case class VeloxWriteFilesMetrics(
- name: String,
- updateMode: String,
- writePath: String,
- targetPath: String,
- fileWriteInfos: Seq[VeloxWriteFilesInfo],
- rowCount: Long,
- inMemoryDataSizeInBytes: Long,
- onDiskDataSizeInBytes: Long,
- containsNumberedFileNames: Boolean)
-
-// Velox write files metrics end
-
-case class VeloxBackendWrite(description: WriteJobDescription) extends
BackendWrite with Logging {
-
- override def collectNativeWriteFilesMetrics(cb: ColumnarBatch):
Option[WriteTaskResult] = {
- // Currently, the cb contains three columns: row, fragments, and context.
- // The first row in the row column contains the number of written numRows.
- // The fragments column contains detailed information about the file
writes.
- val loadedCb =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
- assert(loadedCb.numCols() == 3)
- val numWrittenRows = loadedCb.column(0).getLong(0)
-
- var updatedPartitions = Set.empty[String]
- val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
- var numBytes = 0L
- val objectMapper = new ObjectMapper()
- objectMapper.registerModule(DefaultScalaModule)
- for (i <- 0 until loadedCb.numRows() - 1) {
- val fragments = loadedCb.column(1).getUTF8String(i + 1)
- val metrics = objectMapper
- .readValue(fragments.toString.getBytes("UTF-8"),
classOf[VeloxWriteFilesMetrics])
- logDebug(s"Velox write files metrics: $metrics")
-
- val fileWriteInfos = metrics.fileWriteInfos
- assert(fileWriteInfos.length == 1)
- val fileWriteInfo = fileWriteInfos.head
- numBytes += fileWriteInfo.fileSize
- val targetFileName = fileWriteInfo.targetFileName
- val outputPath = description.path
-
- // part1=1/part2=1
- val partitionFragment = metrics.name
- // Write a partitioned table
- if (partitionFragment != "") {
- updatedPartitions += partitionFragment
- val tmpOutputPath = outputPath + "/" + partitionFragment + "/" +
targetFileName
- val customOutputPath = description.customPartitionLocations.get(
- PartitioningUtils.parsePathFragment(partitionFragment))
- if (customOutputPath.isDefined) {
- addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
- }
- }
- }
-
- val numFiles = loadedCb.numRows() - 1
- val partitionsInternalRows = updatedPartitions.map {
- part =>
- val parts = new Array[Any](1)
- parts(0) = part
- new GenericInternalRow(parts)
- }.toSeq
- val stats = BasicWriteTaskStats(
- partitions = partitionsInternalRows,
- numFiles = numFiles,
- numBytes = numBytes,
- numRows = numWrittenRows)
- val summary =
- ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
-
- // Write an empty iterator
- if (numFiles == 0) {
- None
- } else {
- Some(
- WriteTaskResult(
- new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
- summary))
- }
- }
-}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
similarity index 56%
copy from
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
copy to
backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index b809a4b65..c339014c5 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -17,50 +17,146 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
-import
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType,
RowType}
-import org.apache.gluten.extension.columnar.transition.ConventionReq
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
-import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.FileAlreadyExistsException
import java.util.Date
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write
path before
- * initializing the native plan and collect native write files metrics for
each backend.
- */
-trait BackendWrite {
- def collectNativeWriteFilesMetrics(batch: ColumnarBatch):
Option[WriteTaskResult]
-}
+import scala.collection.mutable
+
+// Velox write files metrics start
+//
+// Follows the code in velox `HiveDataSink::close()`
+// The json can be as following:
+// {
+// "inMemoryDataSizeInBytes":0,
+// "containsNumberedFileNames":true,
+// "onDiskDataSizeInBytes":307,
+// "fileWriteInfos":[
+// {
+// "fileSize":307,
+// "writeFileName":
+//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
+// "targetFileName":
+//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
+// }
+// ],
+//
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "rowCount":1,
+//
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "updateMode":"NEW",
+// "name":"part1=1/part2=1"
+// }
+case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String,
fileSize: Long)
+
+case class VeloxWriteFilesMetrics(
+ name: String,
+ updateMode: String,
+ writePath: String,
+ targetPath: String,
+ fileWriteInfos: Seq[VeloxWriteFilesInfo],
+ rowCount: Long,
+ inMemoryDataSizeInBytes: Long,
+ onDiskDataSizeInBytes: Long,
+ containsNumberedFileNames: Boolean)
+
+// Velox write files metrics end
/**
* This RDD is used to make sure we have injected staging write path before
initializing the native
* plan, and support Spark file commit protocol.
*/
-class ColumnarWriteFilesRDD(
+class VeloxColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
description: WriteJobDescription,
committer: FileCommitProtocol,
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {
+ private def collectNativeWriteFilesMetrics(cb: ColumnarBatch):
Option[WriteTaskResult] = {
+ // Currently, the cb contains three columns: row, fragments, and context.
+ // The first row in the row column contains the number of written numRows.
+ // The fragments column contains detailed information about the file
writes.
+ val loadedCb =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+ assert(loadedCb.numCols() == 3)
+ val numWrittenRows = loadedCb.column(0).getLong(0)
+
+ var updatedPartitions = Set.empty[String]
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
+ var numBytes = 0L
+ val objectMapper = new ObjectMapper()
+ objectMapper.registerModule(DefaultScalaModule)
+ for (i <- 0 until loadedCb.numRows() - 1) {
+ val fragments = loadedCb.column(1).getUTF8String(i + 1)
+ val metrics = objectMapper
+ .readValue(fragments.toString.getBytes("UTF-8"),
classOf[VeloxWriteFilesMetrics])
+ logDebug(s"Velox write files metrics: $metrics")
+
+ val fileWriteInfos = metrics.fileWriteInfos
+ assert(fileWriteInfos.length == 1)
+ val fileWriteInfo = fileWriteInfos.head
+ numBytes += fileWriteInfo.fileSize
+ val targetFileName = fileWriteInfo.targetFileName
+ val outputPath = description.path
+
+ // part1=1/part2=1
+ val partitionFragment = metrics.name
+ // Write a partitioned table
+ if (partitionFragment != "") {
+ updatedPartitions += partitionFragment
+ val tmpOutputPath = outputPath + "/" + partitionFragment + "/" +
targetFileName
+ val customOutputPath = description.customPartitionLocations.get(
+ PartitioningUtils.parsePathFragment(partitionFragment))
+ if (customOutputPath.isDefined) {
+ addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
+ }
+ }
+ }
+
+ val numFiles = loadedCb.numRows() - 1
+ val partitionsInternalRows = updatedPartitions.map {
+ part =>
+ val parts = new Array[Any](1)
+ parts(0) = part
+ new GenericInternalRow(parts)
+ }.toSeq
+ val stats = BasicWriteTaskStats(
+ partitions = partitionsInternalRows,
+ numFiles = numFiles,
+ numBytes = numBytes,
+ numRows = numWrittenRows)
+ val summary =
+ ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
+
+ // Write an empty iterator
+ if (numFiles == 0) {
+ None
+ } else {
+ Some(
+ WriteTaskResult(
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+ summary))
+ }
+ }
+
private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
val stats =
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -94,25 +190,21 @@ class ColumnarWriteFilesRDD(
override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
- val backendWrite =
-
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
- val writeFileName = commitProtocol.getFilename
- logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
-
+ logDebug(s"Velox staging write path: $writePath")
var writeTaskResult: WriteTaskResult = null
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
"")
// Initialize the native plan
val iter = firstParent[ColumnarBatch].iterator(split, context)
assert(iter.hasNext)
val resultColumnarBatch = iter.next()
assert(resultColumnarBatch != null)
- val nativeWriteTaskResult =
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
+ val nativeWriteTaskResult =
collectNativeWriteFilesMetrics(resultColumnarBatch)
if (nativeWriteTaskResult.isEmpty) {
// If we are writing an empty iterator, then velox would do nothing.
// Here we fallback to use vanilla Spark write files to generate an
empty file for
@@ -155,11 +247,7 @@ class ColumnarWriteFilesRDD(
}
}
-// The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
-// we need to expose a dummy child (as right child) with type "WriteFilesExec"
to let Spark
-// choose the new write code path (version >= 3.4). The actual plan to write
is the left child
-// of this operator.
-case class ColumnarWriteFilesExec private (
+case class VeloxColumnarWriteFilesExec private (
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
@@ -167,46 +255,7 @@ case class ColumnarWriteFilesExec private (
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
- extends BinaryExecNode
- with GlutenPlan
- with ColumnarWriteFilesExec.ExecuteWriteCompatible {
-
- val child: SparkPlan = left
-
- override lazy val references: AttributeSet = AttributeSet.empty
-
- override def supportsColumnar(): Boolean = true
-
- override def output: Seq[Attribute] = Seq.empty
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new GlutenException(s"$nodeName does not support doExecute")
- }
-
- /** Fallback to use vanilla Spark write files to generate an empty file for
metadata only. */
- private def writeFilesForEmptyRDD(
- description: WriteJobDescription,
- committer: FileCommitProtocol,
- jobTrackerID: String): RDD[WriterCommitMessage] = {
- val rddWithNonEmptyPartitions =
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
- rddWithNonEmptyPartitions.mapPartitionsInternal {
- iterator =>
- val sparkStageId = TaskContext.get().stageId()
- val sparkPartitionId = TaskContext.get().partitionId()
- val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
-
- val ret = SparkShimLoader.getSparkShims.writeFilesExecuteTask(
- description,
- jobTrackerID,
- sparkStageId,
- sparkPartitionId,
- sparkAttemptNumber,
- committer,
- iterator
- )
- Iterator(ret)
- }
- }
+ extends ColumnarWriteFilesExec(left, right) {
override def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
assert(child.supportsColumnar)
@@ -220,76 +269,12 @@ case class ColumnarWriteFilesExec private (
// partition rdd to make sure we at least set up one write task to write
the metadata.
writeFilesForEmptyRDD(description, committer, jobTrackerID)
} else {
- new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
+ new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
}
}
+
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
}
-
-object ColumnarWriteFilesExec {
-
- def apply(
- child: SparkPlan,
- fileFormat: FileFormat,
- partitionColumns: Seq[Attribute],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String],
- staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
- // This is a workaround for FileFormatWriter#write. Vanilla Spark (version
>= 3.4) requires for
- // a plan that has at least one node exactly of type `WriteFilesExec` that
is a Scala
- // case-class, to decide to choose new `#executeWrite` code path over the
legacy `#execute`
- // for write operation.
- //
- // So we add a no-op `WriteFilesExec` child to let Spark pick the new code
path.
- //
- // See: FileFormatWriter#write
- // See: V1Writes#getWriteFilesOpt
- val right: SparkPlan =
- WriteFilesExec(
- NoopLeaf(),
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
-
- ColumnarWriteFilesExec(
- child,
- right,
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
- }
-
- private case class NoopLeaf() extends LeafExecNode {
- override protected def doExecute(): RDD[InternalRow] =
- throw new GlutenException(s"$nodeName does not support doExecute")
- override def output: Seq[Attribute] = Seq.empty
- }
-
- /**
- * ColumnarWriteFilesExec neither output Row nor columnar data. We output
both row and columnar to
- * avoid c2r and r2c transitions. Please note, [[GlutenPlan]] already
implement batchType()
- */
- sealed trait ExecuteWriteCompatible extends KnownChildrenConventions with
KnownRowType {
- // To be compatible with Spark (version < 3.4)
- protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
- throw new GlutenException(
- s"Internal Error ${this.getClass} has write support" +
- s" mismatch:\n${this}")
- }
-
- override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(ConventionReq.backendBatch)
- }
-
- override def rowType(): RowType = {
- RowType.VanillaRow
- }
- }
-}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 8f24afae1..c37c55a5f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec,
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
-import org.apache.spark.sql.execution.datasources.{FileFormat,
WriteJobDescription}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec,
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
@@ -385,22 +385,12 @@ trait SparkPlanExecApi {
/** Create ColumnarWriteFilesExec */
def createColumnarWriteFilesExec(
child: SparkPlan,
+ noop: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
- staticPartitions: TablePartitionSpec): SparkPlan = {
- ColumnarWriteFilesExec(
- child,
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
- }
-
- /** Create BackendWrite */
- def createBackendWrite(description: WriteJobDescription): BackendWrite
+ staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec
/** Create ColumnarArrowEvalPythonExec, for velox backend */
def createColumnarArrowEvalPythonExec(
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 1d4e11d91..a8cc79128 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -457,7 +457,8 @@ object OffloadOthers {
plan.bucketSpec,
plan.options,
plan.staticPartitions)
-
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
+
+ ColumnarWriteFilesExec(
writeTransformer,
plan.fileFormat,
plan.partitionColumns,
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index b809a4b65..7f6fd01ac 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -24,149 +24,23 @@ import
org.apache.gluten.extension.columnar.transition.ConventionReq
import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.rdd.RDD
-import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.Utils
-
-import org.apache.hadoop.fs.FileAlreadyExistsException
-
-import java.util.Date
-
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write
path before
- * initializing the native plan and collect native write files metrics for
each backend.
- */
-trait BackendWrite {
- def collectNativeWriteFilesMetrics(batch: ColumnarBatch):
Option[WriteTaskResult]
-}
-
-/**
- * This RDD is used to make sure we have injected staging write path before
initializing the native
- * plan, and support Spark file commit protocol.
- */
-class ColumnarWriteFilesRDD(
- var prev: RDD[ColumnarBatch],
- description: WriteJobDescription,
- committer: FileCommitProtocol,
- jobTrackerID: String)
- extends RDD[WriterCommitMessage](prev) {
-
- private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
- val stats =
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
- val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
- // Reports bytesWritten and recordsWritten to the Spark output metrics.
- // We should update it after calling `commitTask` to overwrite the metrics.
- Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
- outputMetrics =>
- outputMetrics.setBytesWritten(numBytes)
- outputMetrics.setRecordsWritten(numWrittenRows)
- }
- }
-
- private def writeFilesForEmptyIterator(
- commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
- val taskAttemptContext = commitProtocol.taskAttemptContext
-
- val dataWriter =
- if (commitProtocol.sparkPartitionId != 0) {
- // In case of empty job, leave first partition to save meta for file
format like parquet.
- new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
- } else if (description.partitionColumns.isEmpty) {
- new SingleDirectoryDataWriter(description, taskAttemptContext,
committer)
- } else {
- new DynamicPartitionDataSingleWriter(description, taskAttemptContext,
committer)
- }
-
- // We have done `setupTask` outside
- dataWriter.writeWithIterator(Iterator.empty)
- dataWriter.commit()
- }
-
- override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
- val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
- val backendWrite =
-
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
-
- commitProtocol.setupTask()
- val writePath = commitProtocol.newTaskAttemptTempPath()
- val writeFileName = commitProtocol.getFilename
- logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
-
- var writeTaskResult: WriteTaskResult = null
- try {
- Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
-
- // Initialize the native plan
- val iter = firstParent[ColumnarBatch].iterator(split, context)
- assert(iter.hasNext)
- val resultColumnarBatch = iter.next()
- assert(resultColumnarBatch != null)
- val nativeWriteTaskResult =
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
- if (nativeWriteTaskResult.isEmpty) {
- // If we are writing an empty iterator, then velox would do nothing.
- // Here we fallback to use vanilla Spark write files to generate an
empty file for
- // metadata only.
- writeTaskResult = writeFilesForEmptyIterator(commitProtocol)
- // We have done commit task inside `writeFilesForEmptyIterator`.
- } else {
- writeTaskResult = nativeWriteTaskResult.get
- commitProtocol.commitTask()
- }
- })(
- catchBlock = {
- // If there is an error, abort the task
- commitProtocol.abortTask()
- logError(s"Job ${commitProtocol.getJobId} aborted.")
- }
- )
- } catch {
- case e: FetchFailedException =>
- throw e
- case f: FileAlreadyExistsException if
SQLConf.get.fastFailFileFormatOutput =>
- throw new TaskOutputFileAlreadyExistException(f)
- case t: Throwable =>
- throw new SparkException(
- s"Task failed while writing rows to staging path: $writePath, " +
- s"output path: ${description.path}",
- t)
- }
-
- assert(writeTaskResult != null)
- reportTaskMetrics(writeTaskResult)
- Iterator.single(writeTaskResult)
- }
-
- override protected def getPartitions: Array[Partition] =
firstParent[ColumnarBatch].partitions
-
- override def clearDependencies(): Unit = {
- super.clearDependencies()
- prev = null
- }
-}
// The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
// we need to expose a dummy child (as right child) with type "WriteFilesExec"
to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write
is the left child
// of this operator.
-case class ColumnarWriteFilesExec private (
+abstract class ColumnarWriteFilesExec protected (
override val left: SparkPlan,
- override val right: SparkPlan,
- fileFormat: FileFormat,
- partitionColumns: Seq[Attribute],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String],
- staticPartitions: TablePartitionSpec)
+ override val right: SparkPlan)
extends BinaryExecNode
with GlutenPlan
with ColumnarWriteFilesExec.ExecuteWriteCompatible {
@@ -184,7 +58,7 @@ case class ColumnarWriteFilesExec private (
}
/** Fallback to use vanilla Spark write files to generate an empty file for
metadata only. */
- private def writeFilesForEmptyRDD(
+ protected def writeFilesForEmptyRDD(
description: WriteJobDescription,
committer: FileCommitProtocol,
jobTrackerID: String): RDD[WriterCommitMessage] = {
@@ -208,25 +82,10 @@ case class ColumnarWriteFilesExec private (
}
}
+ /** We need this to avoid compiler error. */
override def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
- assert(child.supportsColumnar)
-
- val rdd = child.executeColumnar()
- val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
- val description = writeFilesSpec.description
- val committer = writeFilesSpec.committer
- if (rdd.partitions.length == 0) {
- // SPARK-23271 If we are attempting to write a zero partition rdd,
create a dummy single
- // partition rdd to make sure we at least set up one write task to write
the metadata.
- writeFilesForEmptyRDD(description, committer, jobTrackerID)
- } else {
- new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
- }
+ super.doExecuteWrite(writeFilesSpec)
}
- override protected def withNewChildrenInternal(
- newLeft: SparkPlan,
- newRight: SparkPlan): SparkPlan =
- copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
}
object ColumnarWriteFilesExec {
@@ -256,7 +115,7 @@ object ColumnarWriteFilesExec {
options,
staticPartitions)
- ColumnarWriteFilesExec(
+
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
child,
right,
fileFormat,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]