This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e1aa1a2fea [VL] Delta: Support overwrite mode in write (#11226)
e1aa1a2fea is described below
commit e1aa1a2feaa6806186e1dc13f534e5f4dbaf0873
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Dec 9 13:36:25 2025 +0800
[VL] Delta: Support overwrite mode in write (#11226)
---
.../sql/delta/files/GlutenDeltaFileFormatWriter.scala | 19 +++++++++++++------
.../datasources/v2/OffloadDeltaCommand.scala | 4 ++++
.../org/apache/spark/sql/delta/DeltaDDLSuite.scala | 2 ++
.../spark/sql/delta/DeltaInsertIntoTableSuite.scala | 2 ++
4 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 90c0fa1bff..69674b40a3 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -581,14 +581,21 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val blockStripe = iter.next()
val headingRow = blockStripe.getHeadingRow
beforeWrite(headingRow)
- val columnBatch = blockStripe.getColumnarBatch
- currentWriter.write(terminalRow.withNewBatch(columnBatch))
- columnBatch.close()
+ val currentColumnBatch = blockStripe.getColumnarBatch
+ val numRowsOfCurrentColumnarBatch =
currentColumnBatch.numRows()
+ assert(numRowsOfCurrentColumnarBatch > 0)
+ val currentTerminalRow =
terminalRow.withNewBatch(currentColumnBatch)
+ currentWriter.write(currentTerminalRow)
+ statsTrackers.foreach {
+ tracker =>
+ tracker.newRow(currentWriter.path, currentTerminalRow)
+ for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
+ tracker.newRow(currentWriter.path, new
PlaceholderRow())
+ }
+ }
+ currentColumnBatch.close()
}
blockStripes.release()
- for (_ <- 0 until numRows) {
- statsTrackers.foreach(_.newRow(currentWriter.path, record))
- }
recordsInFile += numRows
}
}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
index 60865682b3..4305165ad3 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -21,8 +21,10 @@ import
org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.commands.DeleteCommand
+import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
case class OffloadDeltaCommand() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = {
@@ -32,6 +34,8 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
plan match {
case ExecutedCommandExec(dc: DeleteCommand) =>
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+ case ExecutedCommandExec(s @ SaveIntoDataSourceCommand(_, _:
DeltaDataSource, _, _)) =>
+ ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(s))
case ctas: AtomicCreateTableAsSelectExec if
ctas.catalog.isInstanceOf[DeltaCatalog] =>
GlutenDeltaLeafV2CommandExec(ctas)
case rtas: AtomicReplaceTableAsSelectExec if
rtas.catalog.isInstanceOf[DeltaCatalog] =>
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
index dcb7d7317e..74c69feacd 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
@@ -24,12 +24,14 @@ import
org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructType}
+import org.apache.spark.tags.ExtendedSQLTest
// scalastyle:off import.ordering.noEmptyLine
import org.apache.hadoop.fs.UnsupportedFileSystemException
import scala.collection.JavaConverters._
+@ExtendedSQLTest
class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession with
DeltaSQLCommandTest {
override protected def verifyNullabilityFailure(exception:
AnalysisException): Unit = {
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
index 8485957b4a..8fddfc0267 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy,
SQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.tags.ExtendedSQLTest
import org.scalatest.BeforeAndAfter
@@ -38,6 +39,7 @@ import java.util.TimeZone
import scala.collection.JavaConverters._
+@ExtendedSQLTest
class DeltaInsertIntoSQLSuite
extends DeltaInsertIntoTestsWithTempViews(
supportsDynamicOverwrite = true,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]