This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 05c7fddac9 [GLUTEN-8663][VL] Fix column directory structure for 
partitioned writes (#9733)
05c7fddac9 is described below

commit 05c7fddac91e0d0416bae5ccf382d9acab279f6b
Author: Dina Suehiro Jones <[email protected]>
AuthorDate: Fri May 30 00:21:06 2025 -0700

    [GLUTEN-8663][VL] Fix column directory structure for partitioned writes 
(#9733)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   1 +
 .../gluten/execution/MiscOperatorSuite.scala       |  41 +++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |   1 +
 .../execution/WriteFilesExecTransformer.scala      | 119 ++++++++++++++++++---
 4 files changed, 146 insertions(+), 16 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 7957be7697..034a5d5f17 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -561,4 +561,5 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportIcebergEqualityDeleteRead(): Boolean = false
 
+  override def reorderColumnsForPartitionWrite(): Boolean = true
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 30ff88f343..4bd6012471 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -30,9 +30,11 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
+import java.nio.file.{Files, Paths}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters
+import scala.collection.JavaConverters._
 
 class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
   protected val rootPath: String = getClass.getResource("/").getPath
@@ -1426,6 +1428,45 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
+  test("partitioned write column order") {
+    // Get a dataframe with a limited/known set of values for c_mktsegment and 
c_nationkey
+    // to test the directory structure created for partitioned writes
+    val df = sql(
+      "SELECT c_name, c_nationkey, c_mktsegment FROM customer " +
+        "WHERE c_mktsegment IN ('HOUSEHOLD', 'AUTOMOBILE') " +
+        "AND c_nationkey < 5")
+
+    withTempDir {
+      tempDir =>
+        val tempDirPath = tempDir.getPath
+        df.write
+          .format("parquet")
+          .partitionBy("c_mktsegment", "c_nationkey")
+          .mode("overwrite")
+          .save(tempDirPath)
+
+        // We expect the directory structure to look like:
+        // {tempDirPath}/c_mktsegment=AUTOMOBILE/c_nationkey=[0-4]/*.parquet
+        // {tempDirPath}/c_mktsegment=HOUSEHOLD/c_nationkey=[0-4]/*.parquet
+        val expectedDirs = for {
+          dir <- Seq("c_mktsegment=HOUSEHOLD", "c_mktsegment=AUTOMOBILE")
+          subDir <- 0 to 4
+        } yield Paths.get(tempDirPath, dir, "c_nationkey=" + 
subDir.toString).toString
+
+        // Each directory should have .parquet file(s)
+        expectedDirs.foreach {
+          dir =>
+            val path = Paths.get(dir)
+            assert(Files.exists(path) && Files.isDirectory(path))
+            val files = Files.list(path).iterator().asScala.toSeq
+            assert(files.nonEmpty)
+            val parquetFiles = files.filter(file => 
file.toString.contains(".parquet"))
+            assert(parquetFiles.nonEmpty && parquetFiles.size == files.size)
+        }
+
+    }
+  }
+
   test("timestamp cast fallback") {
     withTempPath {
       path =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index de390b256c..c031fb3aa1 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -156,4 +156,5 @@ trait BackendSettingsApi {
 
   def supportIcebergEqualityDeleteRead(): Boolean = true
 
+  def reorderColumnsForPartitionWrite(): Boolean = false
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 3c1857236b..c1bedcbdc6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -21,9 +21,10 @@ import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.`type`.ColumnTypeNode
+import org.apache.gluten.substrait.`type`.{ColumnTypeNode, TypeNode}
 import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode}
+import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
 import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
 import org.apache.gluten.utils.SubstraitUtil
 
@@ -44,6 +45,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import java.util.Locale
 
+import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 
 /**
@@ -69,6 +71,87 @@ case class WriteFilesExecTransformer(
 
   val caseInsensitiveOptions: CaseInsensitiveMap[String] = 
CaseInsensitiveMap(options)
 
+  private def preProjectionNeeded(): Boolean = {
+    if (
+      partitionColumns == null || partitionColumns.isEmpty ||
+      partitionColumns.size == 1 || !BackendsApiManager.getSettings
+        .reorderColumnsForPartitionWrite()
+    ) {
+      false
+    } else {
+      true
+    }
+  }
+
+  private def createExtensionNode(
+      originalInputAttributes: Seq[Attribute],
+      validation: Boolean): AdvancedExtensionNode = {
+    if (!validation) {
+      ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.genWriteParameters(this),
+        SubstraitUtil.createEnhancement(originalInputAttributes)
+      )
+    } else {
+      // Use an extension node to send the input types through the Substrait 
plan for validation.
+      ExtensionBuilder.makeAdvancedExtension(
+        SubstraitUtil.createEnhancement(originalInputAttributes)
+      )
+    }
+  }
+
+  private def createPreProjectionIfNeeded(
+      context: SubstraitContext,
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean,
+      typeNodes: java.util.List[TypeNode],
+      childOutput: Seq[Attribute]
+  ): (RelNode, java.util.List[TypeNode], Seq[Attribute], Seq[Attribute]) = {
+    // For partitioned writes, create a preproject node to order columns
+    if (preProjectionNeeded()) {
+      // Get the indices of partitioned columns in partition order, followed 
by unpartitioned
+      val inputIndices = originalInputAttributes.zipWithIndex
+      val partitionExprIds = partitionColumns.map(_.exprId).toSet
+      val (partitioned, unpartitioned) = inputIndices.partition {
+        case (col, _) => partitionExprIds.contains(col.exprId)
+      }
+      val orderedIndices = partitionColumns.flatMap {
+        partCol =>
+          partitioned.collect {
+            case (origCol, index) if origCol.exprId == partCol.exprId => index
+          }
+      } ++ unpartitioned.map(_._2)
+
+      // Select cols based on the ordered indices
+      val selectCols = orderedIndices.map(ExpressionBuilder.makeSelection(_))
+
+      // Reorder attribute and type lists based on the ordered indices
+      val typeNodeSeq = typeNodes.asScala.toSeq
+      val orderedTypeNodes = orderedIndices.map(typeNodeSeq)
+      val orderedTypeNodesList: java.util.List[TypeNode] =
+        new java.util.ArrayList(orderedTypeNodes.asJava)
+      val orderedChildOutput = orderedIndices.map(childOutput)
+      val orderedOriginalAttributes = 
orderedIndices.map(originalInputAttributes)
+
+      (
+        RelBuilder.makeProjectRel(
+          input,
+          new java.util.ArrayList[ExpressionNode]((selectCols).asJava),
+          createExtensionNode(originalInputAttributes, validation),
+          context,
+          operatorId,
+          originalInputAttributes.size
+        ),
+        orderedTypeNodesList,
+        orderedChildOutput,
+        orderedOriginalAttributes)
+    } else {
+      // If a preproject is not needed, return the original values
+      (input, typeNodes, childOutput, originalInputAttributes)
+    }
+  }
+
   def getRelNode(
       context: SubstraitContext,
       originalInputAttributes: Seq[Attribute],
@@ -81,8 +164,20 @@ case class WriteFilesExecTransformer(
     val inputAttributes = new java.util.ArrayList[Attribute]()
     val childSize = this.child.output.size
     val childOutput = this.child.output
+
+    val (inputRelNode, orderedTypeNodes, orderedChildOutput, 
orderedOriginalInputAttributes) =
+      createPreProjectionIfNeeded(
+        context,
+        originalInputAttributes,
+        operatorId,
+        input,
+        validation,
+        typeNodes,
+        childOutput
+      )
+
     for (i <- 0 until childSize) {
-      val partitionCol = partitionColumns.find(_.exprId == 
childOutput(i).exprId)
+      val partitionCol = partitionColumns.find(_.exprId == 
orderedChildOutput(i).exprId)
       if (partitionCol.nonEmpty) {
         columnTypeNodes.add(new 
ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL))
         // "aggregate with partition group by can be pushed down"
@@ -91,22 +186,14 @@ case class WriteFilesExecTransformer(
         inputAttributes.add(partitionCol.get)
       } else {
         columnTypeNodes.add(new 
ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL))
-        inputAttributes.add(originalInputAttributes(i))
+        inputAttributes.add(orderedOriginalInputAttributes(i))
       }
     }
 
     val nameList =
       ConverterUtils.collectAttributeNames(inputAttributes.toSeq)
-    val extensionNode = if (!validation) {
-      ExtensionBuilder.makeAdvancedExtension(
-        BackendsApiManager.getTransformerApiInstance.genWriteParameters(this),
-        SubstraitUtil.createEnhancement(originalInputAttributes)
-      )
-    } else {
-      // Use an extension node to send the input types through Substrait plan 
for validation.
-      ExtensionBuilder.makeAdvancedExtension(
-        SubstraitUtil.createEnhancement(originalInputAttributes))
-    }
+
+    val extensionNode = createExtensionNode(orderedOriginalInputAttributes, 
validation)
 
     val bucketSpecOption = bucketSpec.map {
       bucketSpec =>
@@ -118,8 +205,8 @@ case class WriteFilesExecTransformer(
     }
 
     RelBuilder.makeWriteRel(
-      input,
-      typeNodes,
+      inputRelNode,
+      orderedTypeNodes,
       nameList,
       columnTypeNodes,
       extensionNode,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to