This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2dd5632ef [GLUTEN-6705][CH] Basic Support Delta write (#6767)
2dd5632ef is described below
commit 2dd5632efffc2636cdcbe532983530325be22978
Author: Chang chen <[email protected]>
AuthorDate: Fri Aug 9 23:17:43 2024 +0800
[GLUTEN-6705][CH] Basic Support Delta write (#6767)
* Basic Support Native Delta Write
* Fix Build for spark 3.2 and 3.3
* Clean up
* Fix UT failed.
* Fix build error and warning in scala 2.13
---
.../spark/sql/execution/CHDeltaColumnarWrite.scala | 30 +++
.../spark/sql/execution/CHDeltaColumnarWrite.scala | 30 +++
.../delta/ClickhouseOptimisticTransaction.scala | 166 +++++++++++++-
.../spark/sql/execution/CHDeltaColumnarWrite.scala | 144 ++++++++++++
.../spark/shuffle/CHColumnarShuffleWriter.scala | 2 +-
.../spark/sql/execution/CHColumnarWrite.scala | 241 +++++++++++++++++++++
.../sql/execution/CHColumnarWriteFilesExec.scala | 108 ++-------
.../spark/sql/execution/utils/CHExecUtil.scala | 2 +-
.../GlutenClickHouseDeltaParquetWriteSuite.scala | 14 +-
.../GlutenClickHouseNativeWriteTableSuite.scala | 35 ++-
.../apache/spark/gluten/NativeWriteChecker.scala | 5 +-
.../execution/SparkWriteFilesCommitProtocol.scala | 23 +-
.../gluten/planner/plan/GlutenPlanModel.scala | 2 +-
.../sql/execution/ColumnarWriteFilesExec.scala | 2 +-
14 files changed, 675 insertions(+), 129 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..e949bebf2
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
+
+object CHDeltaColumnarWrite {
+ def apply(
+ jobTrackerID: String,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
+ throw new GlutenNotSupportException("Delta Native is not supported in
Spark 3.2")
+}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..0a1aee5c4
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
+
+object CHDeltaColumnarWrite {
+ def apply(
+ jobTrackerID: String,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
+ throw new GlutenNotSupportException("Delta Native is not supported in
Spark 3.3")
+}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 6eec68efe..e023d3d7c 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -25,17 +25,21 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
-import org.apache.spark.sql.delta.files.MergeTreeCommitProtocol
-import org.apache.spark.sql.delta.schema.InvariantViolationException
+import org.apache.spark.sql.delta.files.{DelayedCommitProtocol,
DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite}
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException,
InvariantViolationException}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
+import org.apache.spark.sql.execution.{CHDelayedCommitProtocol,
QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
FakeRowAdaptor, FileFormatWriter, WriteFiles, WriteJobStatsTracker}
import
org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration
import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.hadoop.fs.Path
import scala.collection.mutable.ListBuffer
@@ -190,4 +194,158 @@ class ClickhouseOptimisticTransaction(
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
}
+
+ private def shouldOptimizeWrite(
+ writeOptions: Option[DeltaOptions],
+ sessionConf: SQLConf): Boolean = {
+ writeOptions
+ .flatMap(_.optimizeWrite)
+ .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
+ }
+
+ override protected def getCommitter(outputPath: Path): DelayedCommitProtocol
=
+ new CHDelayedCommitProtocol("delta", outputPath.toString, None,
deltaDataSubdir)
+
+ override def writeFiles(
+ inputData: Dataset[_],
+ writeOptions: Option[DeltaOptions],
+ isOptimize: Boolean,
+ additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
+
+ if (isOptimize)
+ throw new UnsupportedOperationException("Optimize is not supported for
ClickHouse")
+
+ hasWritten = true
+
+ val spark = inputData.sparkSession
+ val (data, partitionSchema) = performCDCPartition(inputData)
+ val outputPath = deltaLog.dataPath
+
+ val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support
changing formats.
+
+ // Iceberg spec requires partition columns in data files
+ val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
+ // Retain only a minimal selection of Spark writer options to avoid any
potential
+ // compatibility issues
+ val options = (writeOptions match {
+ case None => Map.empty[String, String]
+ case Some(writeOptions) =>
+ writeOptions.options.filterKeys {
+ key =>
+ key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+ key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+ }.toMap
+ }) + (DeltaOptions.WRITE_PARTITION_COLUMNS ->
writePartitionColumns.toString)
+
+ val (normalQueryExecution, output, generatedColumnConstraints, _) =
+ normalizeData(deltaLog, writeOptions, data)
+ val partitioningColumns = getPartitioningColumns(partitionSchema, output)
+
+ val logicalPlan = normalQueryExecution.optimizedPlan
+ val write =
+ WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options,
Map.empty)
+
+ val queryExecution = new QueryExecution(spark, write)
+ val committer = getCommitter(outputPath)
+
+ // If Statistics Collection is enabled, then create a stats tracker that
will be injected during
+ // the FileFormatWriter.write call below and will collect per-file stats
using
+ // StatisticsCollection
+ // val (optionalStatsTracker, _) =
+ // getOptionalStatsTrackerAndStatsCollection(output, outputPath,
partitionSchema, data)
+ val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None
+
+ val constraints =
+ Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++
additionalConstraints
+
+ SQLExecution.withNewExecutionId(queryExecution,
Option("deltaTransactionalWrite")) {
+ val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString,
Map.empty, output)
+
+ val physicalPlan =
materializeAdaptiveSparkPlan(queryExecution.executedPlan)
+ // convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
+ /* val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan,
constraints)
+ // No need to plan optimized write if the write command is OPTIMIZE,
which aims to produce
+ // evenly-balanced data files already.
+ val physicalPlan =
+ if (
+ !isOptimize &&
+ shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
+ ) {
+ DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns,
deltaLog)
+ } else {
+ checkInvariants
+ }*/
+ val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
+
+ if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
+ val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
+ new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
+ BasicWriteJobStatsTracker.metrics)
+ registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
+ statsTrackers.append(basicWriteJobStatsTracker)
+ }
+
+ try {
+ DeltaFileFormatWriter.write(
+ sparkSession = spark,
+ plan = physicalPlan,
+ fileFormat = fileFormat,
+ committer = committer,
+ outputSpec = outputSpec,
+ // scalastyle:off deltahadoopconfiguration
+ hadoopConf =
+ spark.sessionState.newHadoopConfWithOptions(metadata.configuration
++ deltaLog.options),
+ // scalastyle:on deltahadoopconfiguration
+ partitionColumns = partitioningColumns,
+ bucketSpec = None,
+ statsTrackers = optionalStatsTracker.toSeq
+ ++ statsTrackers,
+ options = options
+ )
+ } catch {
+ case InnerInvariantViolationException(violationException) =>
+ // Pull an InvariantViolationException up to the top level if it was
the root cause.
+ throw violationException
+ }
+ }
+
+ var resultFiles =
+ (if (optionalStatsTracker.isDefined) {
+ committer.addedStatuses.map {
+ a =>
+ a.copy(stats =
+
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
+ }
+ } else {
+ committer.addedStatuses
+ })
+ .filter {
+ // In some cases, we can write out an empty `inputData`. Some
examples of this (though, they
+ // may be fixed in the future) are the MERGE command when you delete
with empty source, or
+ // empty target, or on disjoint tables. This is hard to catch before
the write without
+ // collecting the DF ahead of time. Instead, we can return only the
AddFiles that
+ // a) actually add rows, or
+ // b) don't have any stats so we don't know the number of rows at all
+ case a: AddFile => a.numLogicalRecords.forall(_ > 0)
+ case _ => true
+ }
+
+ // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
+ if (IcebergCompatV2.isEnabled(metadata)) {
+ resultFiles = resultFiles.map {
+ addFile =>
+ val tags = if (addFile.tags != null) addFile.tags else
Map.empty[String, String]
+ addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name
-> "2"))
+ }
+ }
+
+ if (resultFiles.nonEmpty && !isOptimize)
registerPostCommitHook(AutoCompact)
+
+ resultFiles.toSeq ++ committer.changeFiles
+ }
+
+ private def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan
match {
+ case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+ case p: SparkPlan => p
+ }
}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..66f502038
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.delta.files.DelayedCommitProtocol
+import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary,
WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/** A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and
fields. */
+class CHDelayedCommitProtocol(
+ jobId: String,
+ val outputPath: String,
+ randomPrefixLength: Option[Int],
+ subdir: Option[String])
+ extends DelayedCommitProtocol(jobId, outputPath, randomPrefixLength, subdir)
{
+
+ override def getFileName(
+ taskContext: TaskAttemptContext,
+ ext: String,
+ partitionValues: Map[String, String]): String = {
+ super.getFileName(taskContext, ext, partitionValues)
+ }
+
+ def updateAddedFiles(files: Seq[(Map[String, String], String)]): Unit = {
+ assert(addedFiles.isEmpty)
+ addedFiles ++= files
+ }
+
+ override def parsePartitions(dir: String): Map[String, String] =
+ super.parsePartitions(dir)
+}
+
+case class CHDelayedCommitProtocolWrite(
+ override val jobTrackerID: String,
+ override val description: WriteJobDescription,
+ override val committer: CHDelayedCommitProtocol)
+ extends CHColumnarWrite[CHDelayedCommitProtocol]
+ with Logging {
+
+ override def doSetupNativeTask(): Unit = {
+ assert(description.path == committer.outputPath)
+ val nameSpec = CreateFileNameSpec(taskAttemptContext, description)
+ val writePath = description.path
+ val writeFileName = committer.getFileName(taskAttemptContext,
nameSpec.suffix, Map.empty)
+ logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
+ }
+
+ private def doCollectNativeResult(
+ cb: ColumnarBatch): Option[(Seq[(Map[String, String], String)],
ExecutedWriteSummary)] = {
+ val numFiles = cb.numRows()
+ // Write an empty iterator
+ if (numFiles == 0) {
+ None
+ } else {
+ val file_col = cb.column(0)
+ val partition_col = cb.column(1)
+ val count_col = cb.column(2)
+
+ val partitions: mutable.Set[String] = mutable.Set[String]()
+ val addedFiles: ArrayBuffer[(Map[String, String], String)] =
+ new ArrayBuffer[(Map[String, String], String)]
+
+ var numWrittenRows: Long = 0
+ Range(0, cb.numRows()).foreach {
+ i =>
+ val fileName = file_col.getUTF8String(i).toString
+ val partition = partition_col.getUTF8String(i).toString
+ if (partition == "__NO_PARTITION_ID__") {
+ addedFiles.append((Map.empty[String, String], fileName))
+ } else {
+ val partitionValues = committer.parsePartitions(partition)
+ addedFiles.append((partitionValues, s"$partition/$fileName"))
+ }
+ numWrittenRows += count_col.getLong(i)
+ }
+ val updatedPartitions = partitions.toSet
+ Some(
+ (
+ addedFiles.toSeq,
+ ExecutedWriteSummary(
+ updatedPartitions = updatedPartitions,
+ stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions,
numWrittenRows)))))
+ }
+ }
+
+ override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = {
+ doCollectNativeResult(batch).map {
+ case (addedFiles, summary) =>
+ require(addedFiles.nonEmpty, "No files to commit")
+
+ committer.updateAddedFiles(addedFiles)
+
+ val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
+ committer.commitTask(taskAttemptContext)
+ }
+
+ // Just for update task commit time
+ description.statsTrackers.foreach {
+ stats => stats.newTaskInstance().getFinalStats(taskCommitTime)
+ }
+ WriteTaskResult(taskCommitMessage, summary)
+ }
+ }
+}
+
+object CHDeltaColumnarWrite {
+ def apply(
+ jobTrackerID: String,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
committer match {
+ case c: CHDelayedCommitProtocol =>
+ CHDelayedCommitProtocolWrite(jobTrackerID, description, c)
+ .asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
+ case _ =>
+ throw new GlutenNotSupportException(
+ s"Unsupported committer type: ${committer.getClass.getSimpleName}")
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index db9bba5f1..d60873430 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -178,6 +178,6 @@ class CHColumnarShuffleWriter[K, V](
}
// VisibleForTesting
- def getPartitionLengths: Array[Long] = partitionLengths
+ def getPartitionLengths(): Array[Long] = partitionLengths
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
new file mode 100644
index 000000000..6a5c19a4f
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats,
ExecutedWriteSummary, PartitioningUtils, WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, OutputCommitter,
TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import java.lang.reflect.Field
+
+import scala.collection.mutable
+
+trait CHColumnarWrite[T <: FileCommitProtocol] {
+
+ def description: WriteJobDescription
+ def jobTrackerID: String
+ def committer: T
+ def doSetupNativeTask(): Unit
+
+ def setupTask(): Unit = {
+ committer.setupTask(taskAttemptContext)
+ doSetupNativeTask()
+ }
+
+ def abortTask(): Unit = {
+ committer.abortTask(taskAttemptContext)
+ }
+ def commitTask(batch: ColumnarBatch): Option[WriteTaskResult]
+
+ lazy val (taskAttemptContext: TaskAttemptContext, jobId: String) = {
+ // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with
multi-version
+ def createJobID(jobTrackerID: String, id: Int): JobID = {
+ if (id < 0) {
+ throw new IllegalArgumentException("Job number is negative")
+ }
+ new JobID(jobTrackerID, id)
+ }
+
+ val sparkStageId: Int = TaskContext.get().stageId()
+ val sparkPartitionId: Int = TaskContext.get().partitionId()
+ val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
+ val jobID = createJobID(jobTrackerID, sparkStageId)
+ val taskId = new TaskID(jobID, TaskType.MAP, sparkPartitionId)
+ val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+ // Set up the configuration object
+ val hadoopConf = description.serializableHadoopConf.value
+ hadoopConf.set("mapreduce.job.id", jobID.toString)
+ hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+ hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+ hadoopConf.setBoolean("mapreduce.task.ismap", true)
+ hadoopConf.setInt("mapreduce.task.partition", 0)
+
+ (new TaskAttemptContextImpl(hadoopConf, taskAttemptId), jobID.toString)
+ }
+}
+
+object CreateFileNameSpec {
+ def apply(taskContext: TaskAttemptContext, description:
WriteJobDescription): FileNameSpec = {
+ val fileCounter = 0
+ val suffix = f".c$fileCounter%03d" +
+ description.outputWriterFactory.getFileExtension(taskContext)
+ FileNameSpec("", suffix)
+ }
+}
+
+object CreateBasicWriteTaskStats {
+ def apply(
+ numFiles: Int,
+ updatedPartitions: Set[String],
+ numWrittenRows: Long): BasicWriteTaskStats = {
+ val partitionsInternalRows = updatedPartitions.map {
+ part =>
+ val parts = new Array[Any](1)
+ parts(0) = part
+ new GenericInternalRow(parts)
+ }.toSeq
+ BasicWriteTaskStats(
+ partitions = partitionsInternalRows,
+ numFiles = numFiles,
+ numBytes = 101,
+ numRows = numWrittenRows)
+ }
+}
+
+/** [[HadoopMapReduceAdapter]] for [[HadoopMapReduceCommitProtocol]]. */
+case class HadoopMapReduceAdapter(sparkCommitter:
HadoopMapReduceCommitProtocol) {
+ private lazy val committer: OutputCommitter = {
+ val field: Field =
classOf[HadoopMapReduceCommitProtocol].getDeclaredField("committer")
+ field.setAccessible(true)
+ field.get(sparkCommitter).asInstanceOf[OutputCommitter]
+ }
+ private lazy val GetFilename = {
+ val m = classOf[HadoopMapReduceCommitProtocol]
+ .getDeclaredMethod("getFilename", classOf[TaskAttemptContext],
classOf[FileNameSpec])
+ m.setAccessible(true)
+ m
+ }
+
+ private def newTaskAttemptTempPath(defaultPath: String): String = {
+ assert(committer != null)
+ val stagingDir: Path = committer match {
+ // For FileOutputCommitter it has its own staging path called "work
path".
+ case f: FileOutputCommitter =>
+ new Path(Option(f.getWorkPath).map(_.toString).getOrElse(defaultPath))
+ case _ =>
+ new Path(defaultPath)
+ }
+ stagingDir.toString
+ }
+
+ private def getFilename(taskContext: TaskAttemptContext, spec:
FileNameSpec): String = {
+ GetFilename.invoke(sparkCommitter, taskContext, spec).asInstanceOf[String]
+ }
+
+ def getTaskAttemptTempPathAndFilename(
+ taskContext: TaskAttemptContext,
+ description: WriteJobDescription): (String, String) = {
+ val stageDir = newTaskAttemptTempPath(description.path)
+ val filename = getFilename(taskContext, CreateFileNameSpec(taskContext,
description))
+ (stageDir, filename)
+ }
+}
+
+case class HadoopMapReduceCommitProtocolWrite(
+ override val jobTrackerID: String,
+ override val description: WriteJobDescription,
+ override val committer: HadoopMapReduceCommitProtocol)
+ extends CHColumnarWrite[HadoopMapReduceCommitProtocol]
+ with Logging {
+
+ private lazy val adapter: HadoopMapReduceAdapter =
HadoopMapReduceAdapter(committer)
+
+ /**
+ * This function is used in [[CHColumnarWriteFilesRDD]] to inject the
staging write path before
+ * initializing the native plan and collect native write files metrics for
each backend.
+ */
+ override def doSetupNativeTask(): Unit = {
+ val (writePath, writeFileName) =
+ adapter.getTaskAttemptTempPathAndFilename(taskAttemptContext,
description)
+ logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
+ }
+
+ def doCollectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] = {
+ val numFiles = cb.numRows()
+ // Write an empty iterator
+ if (numFiles == 0) {
+ None
+ } else {
+ val file_col = cb.column(0)
+ val partition_col = cb.column(1)
+ val count_col = cb.column(2)
+
+ val outputPath = description.path
+ val partitions: mutable.Set[String] = mutable.Set[String]()
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
+
+ var numWrittenRows: Long = 0
+ Range(0, cb.numRows()).foreach {
+ i =>
+ val targetFileName = file_col.getUTF8String(i).toString
+ val partition = partition_col.getUTF8String(i).toString
+ if (partition != "__NO_PARTITION_ID__") {
+ partitions += partition
+ val tmpOutputPath = outputPath + "/" + partition + "/" +
targetFileName
+ val customOutputPath =
+ description.customPartitionLocations.get(
+ PartitioningUtils.parsePathFragment(partition))
+ if (customOutputPath.isDefined) {
+ addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
+ }
+ }
+ numWrittenRows += count_col.getLong(i)
+ }
+
+ val updatedPartitions = partitions.toSet
+ val summary =
+ ExecutedWriteSummary(
+ updatedPartitions = updatedPartitions,
+ stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions,
numWrittenRows)))
+ Some(
+ WriteTaskResult(
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+ summary))
+ }
+ }
+
+ override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = {
+ doCollectNativeResult(batch).map(
+ nativeWriteTaskResult => {
+ val (_, taskCommitTime) = Utils.timeTakenMs {
+ committer.commitTask(taskAttemptContext)
+ }
+
+ // Just for update task commit time
+ description.statsTrackers.foreach {
+ stats => stats.newTaskInstance().getFinalStats(taskCommitTime)
+ }
+ nativeWriteTaskResult
+ })
+ }
+}
+
+object CHColumnarWrite {
+ def apply(
+ jobTrackerID: String,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
committer match {
+ case h: HadoopMapReduceCommitProtocol =>
+ HadoopMapReduceCommitProtocolWrite(jobTrackerID, description, h)
+ .asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
+ case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
index 52019d770..bf051671f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
@@ -16,16 +16,13 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
-
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
@@ -33,11 +30,10 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.hadoop.fs.FileAlreadyExistsException
+import org.apache.hadoop.mapreduce.TaskAttemptContext
import java.util.Date
-import scala.collection.mutable
-
/**
* This RDD is used to make sure we have injected staging write path before
initializing the native
* plan, and support Spark file commit protocol.
@@ -49,60 +45,6 @@ class CHColumnarWriteFilesRDD(
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {
- private def collectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult]
= {
- val numFiles = cb.numRows()
- // Write an empty iterator
- if (numFiles == 0) {
- None
- } else {
- val file_col = cb.column(0)
- val partition_col = cb.column(1)
- val count_col = cb.column(2)
-
- val outputPath = description.path
- var updatedPartitions = Set.empty[String]
- val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String,
String]()
-
- val write_stats = Range(0, cb.numRows()).map {
- i =>
- val targetFileName = file_col.getUTF8String(i).toString
- val partition = partition_col.getUTF8String(i).toString
- if (partition != "__NO_PARTITION_ID__") {
- updatedPartitions += partition
- val tmpOutputPath = outputPath + "/" + partition + "/" +
targetFileName
- val customOutputPath =
- description.customPartitionLocations.get(
- PartitioningUtils.parsePathFragment(partition))
- if (customOutputPath.isDefined) {
- addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
- }
- }
- count_col.getLong(i)
- }
-
- val partitionsInternalRows = updatedPartitions.map {
- part =>
- val parts = new Array[Any](1)
- parts(0) = part
- new GenericInternalRow(parts)
- }.toSeq
-
- val numWrittenRows = write_stats.sum
- val stats = BasicWriteTaskStats(
- partitions = partitionsInternalRows,
- numFiles = numFiles,
- numBytes = 101,
- numRows = numWrittenRows)
- val summary =
- ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats =
Seq(stats))
-
- Some(
- WriteTaskResult(
- new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
- summary))
- }
- }
-
private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
val stats =
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -116,11 +58,12 @@ class CHColumnarWriteFilesRDD(
}
private def writeFilesForEmptyIterator(
- commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
- val taskAttemptContext = commitProtocol.taskAttemptContext
+ taskAttemptContext: TaskAttemptContext,
+ sparkPartitionId: Int
+ ): WriteTaskResult = {
val dataWriter =
- if (commitProtocol.sparkPartitionId != 0) {
+ if (sparkPartitionId != 0) {
// In case of empty job, leave first partition to save meta for file
format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
} else if (description.partitionColumns.isEmpty) {
@@ -135,39 +78,35 @@ class CHColumnarWriteFilesRDD(
}
override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
- val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
+ val commitProtocol = CHColumnarWrite(jobTrackerID, description, committer)
commitProtocol.setupTask()
- val writePath = commitProtocol.newTaskAttemptTempPath()
- val writeFileName = commitProtocol.getFilename
- logDebug(s"Native staging write path: $writePath and file name:
$writeFileName")
- var writeTaskResult: WriteTaskResult = null
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
writeFileName)
// Initialize the native plan
val iter = firstParent[ColumnarBatch].iterator(split, context)
assert(iter.hasNext)
val resultColumnarBatch = iter.next()
assert(resultColumnarBatch != null)
- val nativeWriteTaskResult = collectNativeResult(resultColumnarBatch)
- if (nativeWriteTaskResult.isEmpty) {
- // If we are writing an empty iterator, then velox would do nothing.
- // Here we fallback to use vanilla Spark write files to generate an
empty file for
- // metadata only.
- writeTaskResult = writeFilesForEmptyIterator(commitProtocol)
- // We have done commit task inside `writeFilesForEmptyIterator`.
- } else {
- writeTaskResult = nativeWriteTaskResult.get
- commitProtocol.commitTask()
- }
+ val writeTaskResult = commitProtocol
+ .commitTask(resultColumnarBatch)
+ .orElse({
+ // If we are writing an empty iterator, then gluten backend would
do nothing.
+ // Here we fallback to use vanilla Spark write files to generate
an empty file for
+ // metadata only.
+ Some(writeFilesForEmptyIterator(commitProtocol.taskAttemptContext,
context.partitionId))
+ // We have done commit task inside `writeFilesForEmptyIterator`.
+ })
+ .get
+ reportTaskMetrics(writeTaskResult)
+ Iterator.single(writeTaskResult)
})(
catchBlock = {
// If there is an error, abort the task
commitProtocol.abortTask()
- logError(s"Job ${commitProtocol.getJobId} aborted.")
+ logError(s"Job ${commitProtocol.jobId} aborted.")
}
)
} catch {
@@ -177,14 +116,9 @@ class CHColumnarWriteFilesRDD(
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
throw new SparkException(
- s"Task failed while writing rows to staging path: $writePath, " +
- s"output path: ${description.path}",
+ s"Task failed while writing rows to output path:
${description.path}",
t)
}
-
- assert(writeTaskResult != null)
- reportTaskMetrics(writeTaskResult)
- Iterator.single(writeTaskResult)
}
override protected def getPartitions: Array[Partition] =
firstParent[ColumnarBatch].partitions
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 17eb0ed0b..7526e6d3d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -127,7 +127,7 @@ object CHExecUtil extends Logging {
result
}
- override def next: UnsafeRow = {
+ override def next(): UnsafeRow = {
if (rowId >= rows) throw new NoSuchElementException
val (offset, length) = (rowInfo.offsets(rowId), rowInfo.lengths(rowId))
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index 8f8351bae..d6f9a0162 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -1251,7 +1251,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
runTPCHQueryBySQL(1, sqlStr) { _ => {} }
}
- test("test parquet optimize basic") {
+ testSparkVersionLE33("test parquet optimize basic") {
withSQLConf("spark.databricks.delta.optimize.maxFileSize" -> "20000000") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_delta_parquet_optimize;
@@ -1286,7 +1286,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
}
}
- test("test parquet optimize partitioned by one low card column") {
+ testSparkVersionLE33("test parquet optimize partitioned by one low card
column") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p2;
|""".stripMargin)
@@ -1325,7 +1325,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
assert(ret2.apply(0).get(0) == 600572)
}
- test("test parquet optimize parallel delete") {
+ testSparkVersionLE33("test parquet optimize parallel delete") {
withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" ->
"true") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p4;
@@ -1356,7 +1356,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
}
}
- test("test parquet optimize with the path based table") {
+ testSparkVersionLE33("test parquet optimize with the path based table") {
val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based"
clearDataPath(dataPath)
withSQLConf(
@@ -1372,14 +1372,16 @@ class GlutenClickHouseDeltaParquetWriteSuite
.mode(SaveMode.Append)
.save(dataPath)
+ assert(countFiles(new File(dataPath)) === 51)
+
val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.optimize().executeCompaction()
clickhouseTable.vacuum(0.0)
if (sparkVersion.equals("3.2")) {
- assert(countFiles(new File(dataPath)) == 27)
+ assert(countFiles(new File(dataPath)) === 27)
} else {
- assert(countFiles(new File(dataPath)) == 29)
+ assert(countFiles(new File(dataPath)) === 29)
}
val ret = spark.sql(s"select count(*) from
clickhouse.`$dataPath`").collect()
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
index 1f99947e5..11710a758 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
@@ -187,7 +187,7 @@ class GlutenClickHouseNativeWriteTableSuite
checkNative: Boolean = true): Unit = nativeWrite {
format =>
val (table_name, table_create_sql, insert_sql) = f(format)
- withDestinationTable(table_name, table_create_sql) {
+ withDestinationTable(table_name, Option(table_create_sql)) {
checkInsertQuery(insert_sql, checkNative)
Option(extraCheck).foreach(_(table_name, format))
}
@@ -218,15 +218,36 @@ class GlutenClickHouseNativeWriteTableSuite
}
test("supplier: csv to parquet- insert overwrite local directory") {
+ val partitionNumber = 7
withSource(supplierDF, "supplier") {
- nativeWrite {
- format =>
+ nativeWrite2(
+ format => {
val sql =
s"""insert overwrite local directory
|'$basePath/test_insert_into_${format}_supplier'
- |stored as $format select * from supplier""".stripMargin
- checkInsertQuery(sql, checkNative = true)
- }
+ |stored as $format
+ |select /*+ REPARTITION($partitionNumber) */ * from
supplier""".stripMargin
+ (s"test_insert_into_${format}_supplier", null, sql)
+ },
+ (table_name, format) => {
+ // spark 3.2 without orc or parquet suffix
+ val files = recursiveListFiles(new File(s"$basePath/$table_name"))
+ .map(_.getName)
+ .filterNot(s => s.endsWith(s".crc") || s.equals("_SUCCESS"))
+
+ lazy val fileNames = {
+ val dir = s"$basePath/$table_name"
+ recursiveListFiles(new File(dir))
+ .map(f => f.getAbsolutePath.stripPrefix(dir))
+ .sorted
+ .mkString("\n")
+ }
+
+ lazy val errorMessage =
+ s"Search $basePath/$table_name with suffix .$format, all files: \n
$fileNames"
+ assert(files.length === partitionNumber, errorMessage)
+ }
+ )
}
}
@@ -851,7 +872,7 @@ class GlutenClickHouseNativeWriteTableSuite
val table_name = "t_" + format
withDestinationTable(
table_name,
- s"create table $table_name (id int, str string) stored as $format") {
+ Some(s"create table $table_name (id int, str string) stored as
$format")) {
checkInsertQuery(
s"insert overwrite table $table_name " +
"select id, cast(id as string) from range(10) union all " +
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
index fc30d151b..4bee3f177 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
@@ -62,9 +62,10 @@ trait NativeWriteChecker
spark.sql(sqlStr)
}
- def withDestinationTable(table: String, createTableSql: String = "select
1")(f: => Unit): Unit = {
+ def withDestinationTable(table: String, createTableSql: Option[String] =
None)(
+ f: => Unit): Unit = {
spark.sql(s"drop table IF EXISTS $table")
- spark.sql(s"$createTableSql")
+ createTableSql.foreach(spark.sql)
f
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
similarity index 82%
rename from
gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
rename to
backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 5e3ab83e3..845f2f98f 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol,
HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.util.Utils
@@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol(
extends Logging {
assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
- val sparkStageId: Int = TaskContext.get().stageId()
- val sparkPartitionId: Int = TaskContext.get().partitionId()
- private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
+ val sparkStageId = TaskContext.get().stageId()
+ val sparkPartitionId = TaskContext.get().partitionId()
+ val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt &
Int.MaxValue
private val jobId = createJobID(jobTrackerID, sparkStageId)
private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -68,21 +68,6 @@ class SparkWriteFilesCommitProtocol(
field.get(committer).asInstanceOf[OutputCommitter]
}
- private lazy val internalGetFilename = {
- val m = classOf[HadoopMapReduceCommitProtocol]
- .getDeclaredMethod("getFilename", classOf[TaskAttemptContext],
classOf[FileNameSpec])
- m.setAccessible(true)
- m
- }
-
- def getFilename: String = {
- val fileCounter = 0
- val suffix = f".c$fileCounter%03d" +
- description.outputWriterFactory.getFileExtension(taskAttemptContext)
- val fileNameSpec = FileNameSpec("", suffix)
- internalGetFilename.invoke(committer, taskAttemptContext,
fileNameSpec).asInstanceOf[String]
- }
-
def setupTask(): Unit = {
committer.setupTask(taskAttemptContext)
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
index 727613f56..d981de804 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
@@ -49,7 +49,7 @@ object GlutenPlanModel {
override protected def doExecute(): RDD[InternalRow] = throw new
IllegalStateException()
override def output: Seq[Attribute] = metadata.schema().output
- override def supportsColumnar(): Boolean = {
+ override def supportsColumnar: Boolean = {
batchType != Convention.BatchType.None
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 7f6fd01ac..9c54653d9 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -49,7 +49,7 @@ abstract class ColumnarWriteFilesExec protected (
override lazy val references: AttributeSet = AttributeSet.empty
- override def supportsColumnar(): Boolean = true
+ override def supportsColumnar: Boolean = true
override def output: Seq[Attribute] = Seq.empty
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]