This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 206e4be5f [GLUTEN-6067][VL] [Part 3-1] Refactor: Rename
VeloxColumnarWriteFilesExec to ColumnarWriteFilesExec (#6403)
206e4be5f is described below
commit 206e4be5f672249c0e987c5de1f49ac38caec7b0
Author: Chang chen <[email protected]>
AuthorDate: Fri Jul 19 16:46:51 2024 +0800
[GLUTEN-6067][VL] [Part 3-1] Refactor: Rename VeloxColumnarWriteFilesExec
to ColumnarWriteFilesExec (#6403)
* Rename VeloxColumnarWriteFilesExec to GlutenColumnarWriteFilesExec, and
move it to gluten-core
1. Return GlutenColumnarWriteFilesExec at SparkPlanExecApi
2. Move SparkWriteFilesCommitProtocol to gluten-core
3. SparkWriteFilesCommitProtocol support getFilename from internal commiter
4. Remove supportTransformWriteFiles from BackendSettingsApi
5. injectWriteFilesTempPath with fileName
* support pass format to backend
* Rename GlutenColumnarWriteFilesExec to ColumnarWriteFilesExec and
GlutenColumnarWriteFilesRDD to ColumnarWriteFilesRDD
---
.../vectorized/CHNativeExpressionEvaluator.java | 5 +
.../gluten/backendsapi/clickhouse/CHBackend.scala | 9 --
.../backendsapi/clickhouse/CHIteratorApi.scala | 4 +
.../clickhouse/CHSparkPlanExecApi.scala | 21 +--
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 -
.../backendsapi/velox/VeloxIteratorApi.scala | 4 +-
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 20 +--
.../spark/sql/execution/VeloxBackendWrite.scala | 138 ++++++++++++++++++++
.../execution/VeloxParquetWriteForHiveSuite.scala | 4 +-
.../gluten/backendsapi/BackendSettingsApi.scala | 2 -
.../apache/gluten/backendsapi/IteratorApi.scala | 9 +-
.../gluten/backendsapi/SparkPlanExecApi.scala | 17 ++-
.../CartesianProductExecTransformer.scala | 10 +-
.../org/apache/gluten/execution/JoinUtils.scala | 55 ++++----
.../execution/WriteFilesExecTransformer.scala | 41 +++---
.../extension/columnar/validator/Validators.scala | 3 +-
.../org/apache/gluten/utils/SubstraitUtil.scala | 29 +++++
.../sql/execution/ColumnarWriteFilesExec.scala | 143 ++++-----------------
.../execution/SparkWriteFilesCommitProtocol.scala | 23 +++-
.../datasources/GlutenV1WriteCommandSuite.scala | 10 +-
.../spark/sql/sources/GlutenInsertSuite.scala | 20 +--
.../gluten/GlutenColumnarWriteTestSupport.scala | 6 +-
22 files changed, 317 insertions(+), 258 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
index b8b4138dc..45a9f01f7 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
@@ -81,6 +81,11 @@ public class CHNativeExpressionEvaluator {
BackendsApiManager.getSettings().getBackendConfigPrefix(),
SQLConf.get().getAllConfs());
}
+ public static void injectWriteFilesTempPath(String path, String fileName) {
+ throw new UnsupportedOperationException(
+ "injectWriteFilesTempPath Not supported in
CHNativeExpressionEvaluator");
+ }
+
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 341a3e0f0..07129e69a 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -25,13 +25,11 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES,
GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
}
- override def supportWriteFilesExec(
- format: FileFormat,
- fields: Array[StructField],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String]): ValidationResult =
- ValidationResult.failed("CH backend is unsupported.")
-
override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4b9ec7390..6c86583f4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -290,6 +290,10 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput,
inputIterators))
}
+
+ override def injectWriteFilesTempPath(path: String, fileName: String): Unit
= {
+ CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName)
+ }
}
class CollectMetricIterator(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 547458d8c..920c61cd4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -37,8 +37,6 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters,
GlutenShuffleWriter
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -49,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
@@ -145,10 +143,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}
child match {
- case scan: FileSourceScanExec if
(checkMergeTreeFileFormat(scan.relation)) =>
+ case scan: FileSourceScanExec if checkMergeTreeFileFormat(scan.relation)
=>
// For the validation phase of the AddFallbackTagRule
CHFilterExecTransformer(condition, child)
- case scan: FileSourceScanExecTransformerBase if
(checkMergeTreeFileFormat(scan.relation)) =>
+ case scan: FileSourceScanExecTransformerBase if
checkMergeTreeFileFormat(scan.relation) =>
// For the transform phase, the FileSourceScanExec is already
transformed
CHFilterExecTransformer(condition, child)
case _ =>
@@ -395,7 +393,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left: ExpressionTransformer,
right: ExpressionTransformer,
original: GetMapValue): ExpressionTransformer =
- GetMapValueTransformer(substraitExprName, left, right, false, original)
+ GetMapValueTransformer(substraitExprName, left, right, failOnError =
false, original)
/**
* Generate ShuffleDependency for ColumnarShuffleExchangeExec.
@@ -669,15 +667,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHRegExpReplaceTransformer(substraitExprName, children, expr)
}
- override def createColumnarWriteFilesExec(
- child: SparkPlan,
- fileFormat: FileFormat,
- partitionColumns: Seq[Attribute],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String],
- staticPartitions: TablePartitionSpec): SparkPlan = {
- throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support
in ch backend.")
- }
+ def createBackendWrite(description: WriteJobDescription): BackendWrite =
+ throw new UnsupportedOperationException("createBackendWrite is not
supported in ch backend.")
override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b0692816a..933947301 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -495,8 +495,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def staticPartitionWriteOnly(): Boolean = true
- override def supportTransformWriteFiles: Boolean = true
-
override def allowDecimalArithmetic: Boolean = true
override def enableNativeWriteFiles(): Boolean = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 613e53945..1c7f91348 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
(paths, starts, lengths, fileSizes, modificationTimes, partitionColumns,
metadataColumns)
}
- override def injectWriteFilesTempPath(path: String): Unit = {
+ override def injectWriteFilesTempPath(path: String, fileName: String): Unit
= {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
}
@@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
- updateInputMetrics: (InputMetricsWrapper) => Unit,
+ updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()):
Iterator[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 37b46df3e..098d65a96 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -39,8 +39,6 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -50,7 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -550,20 +548,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ShuffleUtil.genColumnarShuffleWriter(parameters)
}
- override def createColumnarWriteFilesExec(
- child: SparkPlan,
- fileFormat: FileFormat,
- partitionColumns: Seq[Attribute],
- bucketSpec: Option[BucketSpec],
- options: Map[String, String],
- staticPartitions: TablePartitionSpec): SparkPlan = {
- VeloxColumnarWriteFilesExec(
- child,
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions)
+ override def createBackendWrite(description: WriteJobDescription):
BackendWrite = {
+ VeloxBackendWrite(description)
}
override def createColumnarArrowEvalPythonExec(
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
new file mode 100644
index 000000000..5d47aff04
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.collection.mutable
+
+// Velox write files metrics start
+//
+// Follows the code in velox `HiveDataSink::close()`
+// The json can be as following:
+// {
+// "inMemoryDataSizeInBytes":0,
+// "containsNumberedFileNames":true,
+// "onDiskDataSizeInBytes":307,
+// "fileWriteInfos":[
+// {
+// "fileSize":307,
+// "writeFileName":
+//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
+// "targetFileName":
+//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
+// }
+// ],
+//
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "rowCount":1,
+//
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "updateMode":"NEW",
+// "name":"part1=1/part2=1"
+// }
+case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String,
fileSize: Long)
+
+case class VeloxWriteFilesMetrics(
+ name: String,
+ updateMode: String,
+ writePath: String,
+ targetPath: String,
+ fileWriteInfos: Seq[VeloxWriteFilesInfo],
+ rowCount: Long,
+ inMemoryDataSizeInBytes: Long,
+ onDiskDataSizeInBytes: Long,
+ containsNumberedFileNames: Boolean)
+
+// Velox write files metrics end
+
+case class VeloxBackendWrite(description: WriteJobDescription) extends
BackendWrite with Logging {
+
+ override def collectNativeWriteFilesMetrics(cb: ColumnarBatch):
Option[WriteTaskResult] = {
+ // Currently, the cb contains three columns: row, fragments, and context.
+ // The first row in the row column contains the number of written numRows.
+ // The fragments column contains detailed information about the file
writes.
+ val loadedCb =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+ assert(loadedCb.numCols() == 3)
+ val numWrittenRows = loadedCb.column(0).getLong(0)
+
+ var updatedPartitions = Set.empty[String]
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
+ var numBytes = 0L
+ val objectMapper = new ObjectMapper()
+ objectMapper.registerModule(DefaultScalaModule)
+ for (i <- 0 until loadedCb.numRows() - 1) {
+ val fragments = loadedCb.column(1).getUTF8String(i + 1)
+ val metrics = objectMapper
+ .readValue(fragments.toString.getBytes("UTF-8"),
classOf[VeloxWriteFilesMetrics])
+ logDebug(s"Velox write files metrics: $metrics")
+
+ val fileWriteInfos = metrics.fileWriteInfos
+ assert(fileWriteInfos.length == 1)
+ val fileWriteInfo = fileWriteInfos.head
+ numBytes += fileWriteInfo.fileSize
+ val targetFileName = fileWriteInfo.targetFileName
+ val outputPath = description.path
+
+ // part1=1/part2=1
+ val partitionFragment = metrics.name
+ // Write a partitioned table
+ if (partitionFragment != "") {
+ updatedPartitions += partitionFragment
+ val tmpOutputPath = outputPath + "/" + partitionFragment + "/" +
targetFileName
+ val customOutputPath = description.customPartitionLocations.get(
+ PartitioningUtils.parsePathFragment(partitionFragment))
+ if (customOutputPath.isDefined) {
+ addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
+ }
+ }
+ }
+
+ val numFiles = loadedCb.numRows() - 1
+ val partitionsInternalRows = updatedPartitions.map {
+ part =>
+ val parts = new Array[Any](1)
+ parts(0) = part
+ new GenericInternalRow(parts)
+ }.toSeq
+ val stats = BasicWriteTaskStats(
+ partitions = partitionsInternalRows,
+ numFiles = numFiles,
+ numBytes = numBytes,
+ numRows = numWrittenRows)
+ val summary =
+ ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
+
+ // Write an empty iterator
+ if (numFiles == 0) {
+ None
+ } else {
+ Some(
+ WriteTaskResult(
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+ summary))
+ }
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 731f5ef48..412548de9 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener
class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
- private var _spark: SparkSession = null
+ private var _spark: SparkSession = _
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest
with SQLTestUtils {
override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
if (!nativeUsed) {
nativeUsed = if (isSparkVersionGE("3.4")) {
-
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
+
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
} else {
qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8b4c18b01..358043cc5 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -129,8 +129,6 @@ trait BackendSettingsApi {
def staticPartitionWriteOnly(): Boolean = false
- def supportTransformWriteFiles: Boolean = false
-
def requiredInputFilePaths(): Boolean = false
// TODO: Move this to test settings as used in UT only.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 53dc8f478..495b91c50 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -47,8 +47,13 @@ trait IteratorApi {
/**
* Inject the task attempt temporary path for native write files, this
method should be called
* before `genFirstStageIterator` or `genFinalStageIterator`
+ * @param path
+ * is the temporary directory for native write pipeline
+ * @param fileName
+ * is the file name for native write pipeline, backend could generate it
by itself.
*/
- def injectWriteFilesTempPath(path: String): Unit = throw new
UnsupportedOperationException()
+ def injectWriteFilesTempPath(path: String, fileName: String): Unit =
+ throw new UnsupportedOperationException()
/**
* Generate Iterator[ColumnarBatch] for first stage. ("first" means it does
not depend on other
@@ -58,7 +63,7 @@ trait IteratorApi {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
- updateInputMetrics: (InputMetricsWrapper) => Unit,
+ updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 58a08192d..273443f64 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec,
LeafExecNode, SparkPlan}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec,
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
@@ -388,7 +388,18 @@ trait SparkPlanExecApi {
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
- staticPartitions: TablePartitionSpec): SparkPlan
+ staticPartitions: TablePartitionSpec): SparkPlan = {
+ ColumnarWriteFilesExec(
+ child,
+ fileFormat,
+ partitionColumns,
+ bucketSpec,
+ options,
+ staticPartitions)
+ }
+
+ /** Create BackendWrite */
+ def createBackendWrite(description: WriteJobDescription): BackendWrite
/** Create ColumnarArrowEvalPythonExec, for velox backend */
def createColumnarArrowEvalPythonExec(
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 0dd110fa5..28bf1eeab 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -86,12 +86,10 @@ case class CartesianProductExecTransformer(
val (inputRightRelNode, inputRightOutput) =
(rightPlanContext.root, rightPlanContext.outputAttributes)
- val expressionNode = condition.map {
- expr =>
- ExpressionConverter
- .replaceWithExpressionTransformer(expr, inputLeftOutput ++
inputRightOutput)
- .doTransform(context.registeredFunction)
- }
+ val expressionNode =
+ condition.map {
+ SubstraitUtil.toSubstraitExpression(_, inputLeftOutput ++
inputRightOutput, context)
+ }
val extensionNode =
JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput,
validation = false)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index 9dd73800e..12d085185 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -16,13 +16,12 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.{AttributeReferenceTransformer,
ConverterUtils, ExpressionConverter}
-import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.expression.{AttributeReferenceTransformer,
ExpressionConverter}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.SubstraitUtil
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans._
@@ -34,21 +33,11 @@ import io.substrait.proto.{CrossRel, JoinRel}
import scala.collection.JavaConverters._
object JoinUtils {
- private def createEnhancement(output: Seq[Attribute]):
com.google.protobuf.Any = {
- val inputTypeNodes = output.map {
- attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
- }
- // Normally the enhancement node is only used for plan validation. But
here the enhancement
- // is also used in execution phase. In this case an empty typeUrlPrefix
need to be passed,
- // so that it can be correctly parsed into json string on the cpp side.
- BackendsApiManager.getTransformerApiInstance.packPBMessage(
- TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
- }
def createExtensionNode(output: Seq[Attribute], validation: Boolean):
AdvancedExtensionNode = {
// Use field [enhancement] in a extension node for input type validation.
if (validation) {
- ExtensionBuilder.makeAdvancedExtension(createEnhancement(output))
+
ExtensionBuilder.makeAdvancedExtension(SubstraitUtil.createEnhancement(output))
} else {
null
}
@@ -58,7 +47,7 @@ object JoinUtils {
!keyExprs.forall(_.isInstanceOf[AttributeReference])
}
- def createPreProjectionIfNeeded(
+ private def createPreProjectionIfNeeded(
keyExprs: Seq[Expression],
inputNode: RelNode,
inputNodeOutput: Seq[Attribute],
@@ -131,17 +120,17 @@ object JoinUtils {
}
}
- def createJoinExtensionNode(
+ private def createJoinExtensionNode(
joinParameters: Any,
output: Seq[Attribute]): AdvancedExtensionNode = {
// Use field [optimization] in a extension node
// to send some join parameters through Substrait plan.
- val enhancement = createEnhancement(output)
+ val enhancement = SubstraitUtil.createEnhancement(output)
ExtensionBuilder.makeAdvancedExtension(joinParameters, enhancement)
}
// Return the direct join output.
- protected def getDirectJoinOutput(
+ private def getDirectJoinOutput(
joinType: JoinType,
leftOutput: Seq[Attribute],
rightOutput: Seq[Attribute]): (Seq[Attribute], Seq[Attribute]) = {
@@ -164,7 +153,7 @@ object JoinUtils {
}
}
- protected def getDirectJoinOutputSeq(
+ private def getDirectJoinOutputSeq(
joinType: JoinType,
leftOutput: Seq[Attribute],
rightOutput: Seq[Attribute]): Seq[Attribute] = {
@@ -209,8 +198,8 @@ object JoinUtils {
validation)
// Combine join keys to make a single expression.
- val joinExpressionNode = (streamedKeys
- .zip(buildKeys))
+ val joinExpressionNode = streamedKeys
+ .zip(buildKeys)
.map {
case ((leftKey, leftType), (rightKey, rightType)) =>
HashJoinLikeExecTransformer.makeEqualToExpression(
@@ -225,12 +214,10 @@ object JoinUtils {
HashJoinLikeExecTransformer.makeAndExpression(l, r,
substraitContext.registeredFunction))
// Create post-join filter, which will be computed in hash join.
- val postJoinFilter = condition.map {
- expr =>
- ExpressionConverter
- .replaceWithExpressionTransformer(expr, streamedOutput ++
buildOutput)
- .doTransform(substraitContext.registeredFunction)
- }
+ val postJoinFilter =
+ condition.map {
+ SubstraitUtil.toSubstraitExpression(_, streamedOutput ++ buildOutput,
substraitContext)
+ }
// Create JoinRel.
val joinRel = RelBuilder.makeJoinRel(
@@ -340,12 +327,14 @@ object JoinUtils {
joinParameters: Any,
validation: Boolean = false
): RelNode = {
- val expressionNode = condition.map {
- expr =>
- ExpressionConverter
- .replaceWithExpressionTransformer(expr, inputStreamedOutput ++
inputBuildOutput)
- .doTransform(substraitContext.registeredFunction)
- }
+ val expressionNode =
+ condition.map {
+ SubstraitUtil.toSubstraitExpression(
+ _,
+ inputStreamedOutput ++ inputBuildOutput,
+ substraitContext)
+ }
+
val extensionNode =
createJoinExtensionNode(joinParameters, inputStreamedOutput ++
inputBuildOutput)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index d78f21bea..d2ec994ba 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -21,10 +21,11 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.`type`.{ColumnTypeNode, TypeBuilder}
+import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.SubstraitUtil
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -32,7 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.MetadataBuilder
import com.google.protobuf.{Any, StringValue}
@@ -40,7 +43,6 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import java.util.Locale
-import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
/**
@@ -56,7 +58,7 @@ case class WriteFilesExecTransformer(
staticPartitions: TablePartitionSpec)
extends UnaryTransformSupport {
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
- @transient override lazy val metrics =
+ @transient override lazy val metrics: Map[String, SQLMetric] =
BackendsApiManager.getMetricsApiInstance.genWriteFilesTransformerMetrics(sparkContext)
override def metricsUpdater(): MetricsUpdater =
@@ -66,11 +68,18 @@ case class WriteFilesExecTransformer(
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
- def genWriteParameters(): Any = {
+ private def genWriteParameters(): Any = {
+ val fileFormatStr = fileFormat match {
+ case register: DataSourceRegister =>
+ register.shortName
+ case _ => "UnknownFileFormat"
+ }
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(caseInsensitiveOptions).capitalize
val writeParametersStr = new StringBuffer("WriteParameters:")
-
writeParametersStr.append("is").append(compressionCodec).append("=1").append("\n")
+ writeParametersStr.append("is").append(compressionCodec).append("=1")
+ writeParametersStr.append(";format=").append(fileFormatStr).append("\n")
+
val message = StringValue
.newBuilder()
.setValue(writeParametersStr.toString)
@@ -78,15 +87,6 @@ case class WriteFilesExecTransformer(
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}
- def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = {
- val inputTypeNodes = output.map {
- attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
- }
-
- BackendsApiManager.getTransformerApiInstance.packPBMessage(
- TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
- }
-
def getRelNode(
context: SubstraitContext,
originalInputAttributes: Seq[Attribute],
@@ -118,10 +118,11 @@ case class WriteFilesExecTransformer(
val extensionNode = if (!validation) {
ExtensionBuilder.makeAdvancedExtension(
genWriteParameters(),
- createEnhancement(originalInputAttributes))
+ SubstraitUtil.createEnhancement(originalInputAttributes))
} else {
// Use a extension node to send the input types through Substrait plan
for validation.
-
ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes))
+ ExtensionBuilder.makeAdvancedExtension(
+ SubstraitUtil.createEnhancement(originalInputAttributes))
}
RelBuilder.makeWriteRel(
input,
@@ -133,7 +134,7 @@ case class WriteFilesExecTransformer(
operatorId)
}
- private def getFinalChildOutput(): Seq[Attribute] = {
+ private def getFinalChildOutput: Seq[Attribute] = {
val metadataExclusionList = conf
.getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
.split(",")
@@ -143,7 +144,7 @@ case class WriteFilesExecTransformer(
}
override protected def doValidateInternal(): ValidationResult = {
- val finalChildOutput = getFinalChildOutput()
+ val finalChildOutput = getFinalChildOutput
val validationResult =
BackendsApiManager.getSettings.supportWriteFilesExec(
fileFormat,
@@ -165,7 +166,7 @@ case class WriteFilesExecTransformer(
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
val currRel =
- getRelNode(context, getFinalChildOutput(), operatorId, childCtx.root,
validation = false)
+ getRelNode(context, getFinalChildOutput, operatorId, childCtx.root,
validation = false)
assert(currRel != null, "Write Rel should be valid")
TransformContext(childCtx.outputAttributes, output, currRel)
}
@@ -196,7 +197,7 @@ object WriteFilesExecTransformer {
"__file_source_generated_metadata_col"
)
- def removeMetadata(attr: Attribute, metadataExclusionList: Seq[String]):
Attribute = {
+ private def removeMetadata(attr: Attribute, metadataExclusionList:
Seq[String]): Attribute = {
val metadataKeys = INTERNAL_METADATA_KEYS ++ metadataExclusionList
attr.withMetadata {
var builder = new MetadataBuilder().withMetadata(attr.metadata)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 903723ccb..a85cb163c 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -141,8 +141,7 @@ object Validators {
override def validate(plan: SparkPlan): Validator.OutCome = plan match {
case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() =>
fail(p)
case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() =>
fail(p)
- case p: WriteFilesExec
- if !(settings.enableNativeWriteFiles() &&
settings.supportTransformWriteFiles) =>
+ case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
fail(p)
case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
fail(p)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
index e8e7ce06f..c641cb448 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
@@ -16,10 +16,19 @@
*/
package org.apache.gluten.utils
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.expression.ExpressionNode
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType,
LeftAnti, LeftOuter, LeftSemi, RightOuter}
import io.substrait.proto.{CrossRel, JoinRel}
+import scala.collection.JavaConverters._
+
object SubstraitUtil {
def toSubstrait(sparkJoin: JoinType): JoinRel.JoinType = sparkJoin match {
case _: InnerLike =>
@@ -55,4 +64,24 @@ object SubstraitUtil {
case _ =>
CrossRel.JoinType.UNRECOGNIZED
}
+
+ def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = {
+ val inputTypeNodes = output.map {
+ attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
+ }
+ // Normally the enhancement node is only used for plan validation. But
here the enhancement
+ // is also used in execution phase. In this case an empty typeUrlPrefix
need to be passed,
+ // so that it can be correctly parsed into json string on the cpp side.
+ BackendsApiManager.getTransformerApiInstance.packPBMessage(
+ TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
+ }
+
+ def toSubstraitExpression(
+ expr: Expression,
+ attributeSeq: Seq[Attribute],
+ context: SubstraitContext): ExpressionNode = {
+ ExpressionConverter
+ .replaceWithExpressionTransformer(expr, attributeSeq)
+ .doTransform(context.registeredFunction)
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
similarity index 67%
rename from
backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
rename to
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index c87b8d4f6..6f04b8480 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -17,150 +17,47 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
GenericInternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.FileAlreadyExistsException
import java.util.Date
-import scala.collection.mutable
-
-// Velox write files metrics start
-//
-// Follows the code in velox `HiveDataSink::close()`
-// The json can be as following:
-// {
-// "inMemoryDataSizeInBytes":0,
-// "containsNumberedFileNames":true,
-// "onDiskDataSizeInBytes":307,
-// "fileWriteInfos":[
-// {
-// "fileSize":307,
-// "writeFileName":
-//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
-// "targetFileName":
-//
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
-// }
-// ],
-//
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-// "rowCount":1,
-//
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-// "updateMode":"NEW",
-// "name":"part1=1/part2=1"
-// }
-case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String,
fileSize: Long)
-
-case class VeloxWriteFilesMetrics(
- name: String,
- updateMode: String,
- writePath: String,
- targetPath: String,
- fileWriteInfos: Seq[VeloxWriteFilesInfo],
- rowCount: Long,
- inMemoryDataSizeInBytes: Long,
- onDiskDataSizeInBytes: Long,
- containsNumberedFileNames: Boolean)
-
-// Velox write files metrics end
+/**
+ * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write
path before
+ * initializing the native plan and collect native write files metrics for
each backend.
+ */
+trait BackendWrite {
+ def collectNativeWriteFilesMetrics(batch: ColumnarBatch):
Option[WriteTaskResult]
+}
/**
* This RDD is used to make sure we have injected staging write path before
initializing the native
* plan, and support Spark file commit protocol.
*/
-class VeloxColumnarWriteFilesRDD(
+class ColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
description: WriteJobDescription,
committer: FileCommitProtocol,
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {
- private def collectNativeWriteFilesMetrics(cb: ColumnarBatch):
Option[WriteTaskResult] = {
- // Currently, the cb contains three columns: row, fragments, and context.
- // The first row in the row column contains the number of written numRows.
- // The fragments column contains detailed information about the file
writes.
- val loadedCb =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
- assert(loadedCb.numCols() == 3)
- val numWrittenRows = loadedCb.column(0).getLong(0)
-
- var updatedPartitions = Set.empty[String]
- val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
- var numBytes = 0L
- val objectMapper = new ObjectMapper()
- objectMapper.registerModule(DefaultScalaModule)
- for (i <- 0 until loadedCb.numRows() - 1) {
- val fragments = loadedCb.column(1).getUTF8String(i + 1)
- val metrics = objectMapper
- .readValue(fragments.toString.getBytes("UTF-8"),
classOf[VeloxWriteFilesMetrics])
- logDebug(s"Velox write files metrics: $metrics")
-
- val fileWriteInfos = metrics.fileWriteInfos
- assert(fileWriteInfos.length == 1)
- val fileWriteInfo = fileWriteInfos.head
- numBytes += fileWriteInfo.fileSize
- val targetFileName = fileWriteInfo.targetFileName
- val outputPath = description.path
-
- // part1=1/part2=1
- val partitionFragment = metrics.name
- // Write a partitioned table
- if (partitionFragment != "") {
- updatedPartitions += partitionFragment
- val tmpOutputPath = outputPath + "/" + partitionFragment + "/" +
targetFileName
- val customOutputPath = description.customPartitionLocations.get(
- PartitioningUtils.parsePathFragment(partitionFragment))
- if (customOutputPath.isDefined) {
- addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
- }
- }
- }
-
- val numFiles = loadedCb.numRows() - 1
- val partitionsInternalRows = updatedPartitions.map {
- part =>
- val parts = new Array[Any](1)
- parts(0) = part
- new GenericInternalRow(parts)
- }.toSeq
- val stats = BasicWriteTaskStats(
- partitions = partitionsInternalRows,
- numFiles = numFiles,
- numBytes = numBytes,
- numRows = numWrittenRows)
- val summary =
- ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
-
- // Write an empty iterator
- if (numFiles == 0) {
- None
- } else {
- Some(
- WriteTaskResult(
- new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
- summary))
- }
- }
-
private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
val stats =
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -194,21 +91,25 @@ class VeloxColumnarWriteFilesRDD(
override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
+ val backendWrite =
+
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
- logDebug(s"Velox staging write path: $writePath")
+ val writeFileName = commitProtocol.getFilename
+ logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
+
var writeTaskResult: WriteTaskResult = null
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath)
+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
// Initialize the native plan
val iter = firstParent[ColumnarBatch].iterator(split, context)
assert(iter.hasNext)
val resultColumnarBatch = iter.next()
assert(resultColumnarBatch != null)
- val nativeWriteTaskResult =
collectNativeWriteFilesMetrics(resultColumnarBatch)
+ val nativeWriteTaskResult =
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
if (nativeWriteTaskResult.isEmpty) {
// If we are writing an empty iterator, then velox would do nothing.
// Here we fallback to use vanilla Spark write files to generate an
empty file for
@@ -255,7 +156,7 @@ class VeloxColumnarWriteFilesRDD(
// we need to expose a dummy child (as right child) with type "WriteFilesExec"
to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write
is the left child
// of this operator.
-case class VeloxColumnarWriteFilesExec private (
+case class ColumnarWriteFilesExec private (
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
@@ -265,7 +166,7 @@ case class VeloxColumnarWriteFilesExec private (
staticPartitions: TablePartitionSpec)
extends BinaryExecNode
with GlutenPlan
- with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
+ with ColumnarWriteFilesExec.ExecuteWriteCompatible {
val child: SparkPlan = left
@@ -316,7 +217,7 @@ case class VeloxColumnarWriteFilesExec private (
// partition rdd to make sure we at least set up one write task to write
the metadata.
writeFilesForEmptyRDD(description, committer, jobTrackerID)
} else {
- new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
+ new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
}
}
override protected def withNewChildrenInternal(
@@ -325,7 +226,7 @@ case class VeloxColumnarWriteFilesExec private (
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options,
staticPartitions)
}
-object VeloxColumnarWriteFilesExec {
+object ColumnarWriteFilesExec {
def apply(
child: SparkPlan,
@@ -333,7 +234,7 @@ object VeloxColumnarWriteFilesExec {
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
- staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
+ staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
// This is a workaround for FileFormatWriter#write. Vanilla Spark (version
>= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that
is a Scala
// case-class, to decide to choose new `#executeWrite` code path over the
legacy `#execute`
@@ -352,7 +253,7 @@ object VeloxColumnarWriteFilesExec {
options,
staticPartitions)
- VeloxColumnarWriteFilesExec(
+ ColumnarWriteFilesExec(
child,
right,
fileFormat,
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
similarity index 82%
rename from
backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
rename to
gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 845f2f98f..5e3ab83e3 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol,
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.util.Utils
@@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol(
extends Logging {
assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
- val sparkStageId = TaskContext.get().stageId()
- val sparkPartitionId = TaskContext.get().partitionId()
- val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
+ val sparkStageId: Int = TaskContext.get().stageId()
+ val sparkPartitionId: Int = TaskContext.get().partitionId()
+ private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
private val jobId = createJobID(jobTrackerID, sparkStageId)
private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -68,6 +68,21 @@ class SparkWriteFilesCommitProtocol(
field.get(committer).asInstanceOf[OutputCommitter]
}
+ private lazy val internalGetFilename = {
+ val m = classOf[HadoopMapReduceCommitProtocol]
+ .getDeclaredMethod("getFilename", classOf[TaskAttemptContext],
classOf[FileNameSpec])
+ m.setAccessible(true)
+ m
+ }
+
+ def getFilename: String = {
+ val fileCounter = 0
+ val suffix = f".c$fileCounter%03d" +
+ description.outputWriterFactory.getFileExtension(taskAttemptContext)
+ val fileNameSpec = FileNameSpec("", suffix)
+ internalGetFilename.invoke(committer, taskAttemptContext,
fileNameSpec).asInstanceOf[String]
+ }
+
def setupTask(): Unit = {
committer.setupTask(taskAttemptContext)
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
index 3d277b94c..726ace3a1 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.catalyst.expressions.{Ascending,
AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{QueryExecution, SortExec,
VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, QueryExecution,
SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType}
@@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get
val plan = if (enabled) {
- assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
- executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+ assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
+ executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec =>
a.executedPlan }
}
@@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get
val plan = if (enabled) {
- assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
- executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+ assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
+ executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec =>
a.executedPlan }
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index ca4b3740a..5c60115c5 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution,
VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec,
CommandResultExec, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -60,13 +60,13 @@ class GlutenInsertSuite
super.afterAll()
}
- private def checkAndGetWriteFiles(df: DataFrame):
VeloxColumnarWriteFilesExec = {
+ private def checkAndGetWriteFiles(df: DataFrame): ColumnarWriteFilesExec = {
val writeFiles = stripAQEPlan(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan).children.head
- assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec])
- writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec]
+ assert(writeFiles.isInstanceOf[ColumnarWriteFilesExec])
+ writeFiles.asInstanceOf[ColumnarWriteFilesExec]
}
testGluten("insert partition table") {
@@ -405,7 +405,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
@@ -425,7 +425,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
@@ -452,7 +452,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
@@ -474,7 +474,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
@@ -501,7 +501,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
@@ -571,7 +571,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
- Seq((false)).toDF.write.insertInto("t")
+ Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
diff --git
a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
index c7ad606bc..68c0e1c93 100644
---
a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
+++
b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
@@ -16,12 +16,12 @@
*/
package org.apache.gluten
-import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{SparkPlan, ColumnarWriteFilesExec}
trait GlutenColumnarWriteTestSupport {
def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
- assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
- sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+ assert(sparkPlan.isInstanceOf[ColumnarWriteFilesExec])
+ sparkPlan.asInstanceOf[ColumnarWriteFilesExec].child
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]