This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e1aa1a2fea [VL] Delta: Support overwrite mode in write (#11226)
e1aa1a2fea is described below

commit e1aa1a2feaa6806186e1dc13f534e5f4dbaf0873
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Dec 9 13:36:25 2025 +0800

    [VL] Delta: Support overwrite mode in write (#11226)
---
 .../sql/delta/files/GlutenDeltaFileFormatWriter.scala | 19 +++++++++++++------
 .../datasources/v2/OffloadDeltaCommand.scala          |  4 ++++
 .../org/apache/spark/sql/delta/DeltaDDLSuite.scala    |  2 ++
 .../spark/sql/delta/DeltaInsertIntoTableSuite.scala   |  2 ++
 4 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 90c0fa1bff..69674b40a3 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -581,14 +581,21 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
                   val blockStripe = iter.next()
                   val headingRow = blockStripe.getHeadingRow
                   beforeWrite(headingRow)
-                  val columnBatch = blockStripe.getColumnarBatch
-                  currentWriter.write(terminalRow.withNewBatch(columnBatch))
-                  columnBatch.close()
+                  val currentColumnBatch = blockStripe.getColumnarBatch
+                  val numRowsOfCurrentColumnarBatch = 
currentColumnBatch.numRows()
+                  assert(numRowsOfCurrentColumnarBatch > 0)
+                  val currentTerminalRow = 
terminalRow.withNewBatch(currentColumnBatch)
+                  currentWriter.write(currentTerminalRow)
+                  statsTrackers.foreach {
+                    tracker =>
+                      tracker.newRow(currentWriter.path, currentTerminalRow)
+                      for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
+                        tracker.newRow(currentWriter.path, new 
PlaceholderRow())
+                      }
+                  }
+                  currentColumnBatch.close()
                 }
                 blockStripes.release()
-                for (_ <- 0 until numRows) {
-                  statsTrackers.foreach(_.newRow(currentWriter.path, record))
-                }
                 recordsInFile += numRows
               }
           }
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
index 60865682b3..4305165ad3 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -21,8 +21,10 @@ import 
org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 
 import org.apache.spark.sql.delta.catalog.DeltaCatalog
 import org.apache.spark.sql.delta.commands.DeleteCommand
+import org.apache.spark.sql.delta.sources.DeltaDataSource
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
 
 case class OffloadDeltaCommand() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = {
@@ -32,6 +34,8 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
     plan match {
       case ExecutedCommandExec(dc: DeleteCommand) =>
         ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+      case ExecutedCommandExec(s @ SaveIntoDataSourceCommand(_, _: 
DeltaDataSource, _, _)) =>
+        ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(s))
       case ctas: AtomicCreateTableAsSelectExec if 
ctas.catalog.isInstanceOf[DeltaCatalog] =>
         GlutenDeltaLeafV2CommandExec(ctas)
       case rtas: AtomicReplaceTableAsSelectExec if 
rtas.catalog.isInstanceOf[DeltaCatalog] =>
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
index dcb7d7317e..74c69feacd 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
@@ -24,12 +24,14 @@ import 
org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType}
+import org.apache.spark.tags.ExtendedSQLTest
 
 // scalastyle:off import.ordering.noEmptyLine
 import org.apache.hadoop.fs.UnsupportedFileSystemException
 
 import scala.collection.JavaConverters._
 
+@ExtendedSQLTest
 class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession with 
DeltaSQLCommandTest {
 
   override protected def verifyNullabilityFailure(exception: 
AnalysisException): Unit = {
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
index 8485957b4a..8fddfc0267 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, 
SQLConf}
 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.tags.ExtendedSQLTest
 
 import org.scalatest.BeforeAndAfter
 
@@ -38,6 +39,7 @@ import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 
+@ExtendedSQLTest
 class DeltaInsertIntoSQLSuite
   extends DeltaInsertIntoTestsWithTempViews(
     supportsDynamicOverwrite = true,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to