This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 05c7fddac9 [GLUTEN-8663][VL] Fix column directory structure for
partitioned writes (#9733)
05c7fddac9 is described below
commit 05c7fddac91e0d0416bae5ccf382d9acab279f6b
Author: Dina Suehiro Jones <[email protected]>
AuthorDate: Fri May 30 00:21:06 2025 -0700
[GLUTEN-8663][VL] Fix column directory structure for partitioned writes
(#9733)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 1 +
.../gluten/execution/MiscOperatorSuite.scala | 41 +++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 1 +
.../execution/WriteFilesExecTransformer.scala | 119 ++++++++++++++++++---
4 files changed, 146 insertions(+), 16 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 7957be7697..034a5d5f17 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -561,4 +561,5 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportIcebergEqualityDeleteRead(): Boolean = false
+ override def reorderColumnsForPartitionWrite(): Boolean = true
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 30ff88f343..4bd6012471 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -30,9 +30,11 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters
+import scala.collection.JavaConverters._
class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
@@ -1426,6 +1428,45 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
+ test("partitioned write column order") {
+ // Get a dataframe with a limited/known set of values for c_mktsegment and
c_nationkey
+ // to test the directory structure created for partitioned writes
+ val df = sql(
+ "SELECT c_name, c_nationkey, c_mktsegment FROM customer " +
+ "WHERE c_mktsegment IN ('HOUSEHOLD', 'AUTOMOBILE') " +
+ "AND c_nationkey < 5")
+
+ withTempDir {
+ tempDir =>
+ val tempDirPath = tempDir.getPath
+ df.write
+ .format("parquet")
+ .partitionBy("c_mktsegment", "c_nationkey")
+ .mode("overwrite")
+ .save(tempDirPath)
+
+ // We expect the directory structure to look like:
+ // {tempDirPath}/c_mktsegment=AUTOMOBILE/c_nationkey=[0-4]/*.parquet
+ // {tempDirPath}/c_mktsegment=HOUSEHOLD/c_nationkey=[0-4]/*.parquet
+ val expectedDirs = for {
+ dir <- Seq("c_mktsegment=HOUSEHOLD", "c_mktsegment=AUTOMOBILE")
+ subDir <- 0 to 4
+ } yield Paths.get(tempDirPath, dir, "c_nationkey=" +
subDir.toString).toString
+
+ // Each directory should have .parquet file(s)
+ expectedDirs.foreach {
+ dir =>
+ val path = Paths.get(dir)
+ assert(Files.exists(path) && Files.isDirectory(path))
+ val files = Files.list(path).iterator().asScala.toSeq
+ assert(files.nonEmpty)
+ val parquetFiles = files.filter(file =>
file.toString.contains(".parquet"))
+ assert(parquetFiles.nonEmpty && parquetFiles.size == files.size)
+ }
+
+ }
+ }
+
test("timestamp cast fallback") {
withTempPath {
path =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index de390b256c..c031fb3aa1 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -156,4 +156,5 @@ trait BackendSettingsApi {
def supportIcebergEqualityDeleteRead(): Boolean = true
+ def reorderColumnsForPartitionWrite(): Boolean = false
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 3c1857236b..c1bedcbdc6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -21,9 +21,10 @@ import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.`type`.ColumnTypeNode
+import org.apache.gluten.substrait.`type`.{ColumnTypeNode, TypeNode}
import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
+import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.gluten.utils.SubstraitUtil
@@ -44,6 +45,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import java.util.Locale
+import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
/**
@@ -69,6 +71,87 @@ case class WriteFilesExecTransformer(
val caseInsensitiveOptions: CaseInsensitiveMap[String] =
CaseInsensitiveMap(options)
+ private def preProjectionNeeded(): Boolean = {
+ if (
+ partitionColumns == null || partitionColumns.isEmpty ||
+ partitionColumns.size == 1 || !BackendsApiManager.getSettings
+ .reorderColumnsForPartitionWrite()
+ ) {
+ false
+ } else {
+ true
+ }
+ }
+
+ private def createExtensionNode(
+ originalInputAttributes: Seq[Attribute],
+ validation: Boolean): AdvancedExtensionNode = {
+ if (!validation) {
+ ExtensionBuilder.makeAdvancedExtension(
+ BackendsApiManager.getTransformerApiInstance.genWriteParameters(this),
+ SubstraitUtil.createEnhancement(originalInputAttributes)
+ )
+ } else {
+ // Use an extension node to send the input types through the Substrait
plan for validation.
+ ExtensionBuilder.makeAdvancedExtension(
+ SubstraitUtil.createEnhancement(originalInputAttributes)
+ )
+ }
+ }
+
+ private def createPreProjectionIfNeeded(
+ context: SubstraitContext,
+ originalInputAttributes: Seq[Attribute],
+ operatorId: Long,
+ input: RelNode,
+ validation: Boolean,
+ typeNodes: java.util.List[TypeNode],
+ childOutput: Seq[Attribute]
+ ): (RelNode, java.util.List[TypeNode], Seq[Attribute], Seq[Attribute]) = {
+ // For partitioned writes, create a preproject node to order columns
+ if (preProjectionNeeded()) {
+ // Get the indices of partitioned columns in partition order, followed
by unpartitioned
+ val inputIndices = originalInputAttributes.zipWithIndex
+ val partitionExprIds = partitionColumns.map(_.exprId).toSet
+ val (partitioned, unpartitioned) = inputIndices.partition {
+ case (col, _) => partitionExprIds.contains(col.exprId)
+ }
+ val orderedIndices = partitionColumns.flatMap {
+ partCol =>
+ partitioned.collect {
+ case (origCol, index) if origCol.exprId == partCol.exprId => index
+ }
+ } ++ unpartitioned.map(_._2)
+
+ // Select cols based on the ordered indices
+ val selectCols = orderedIndices.map(ExpressionBuilder.makeSelection(_))
+
+ // Reorder attribute and type lists based on the ordered indices
+ val typeNodeSeq = typeNodes.asScala.toSeq
+ val orderedTypeNodes = orderedIndices.map(typeNodeSeq)
+ val orderedTypeNodesList: java.util.List[TypeNode] =
+ new java.util.ArrayList(orderedTypeNodes.asJava)
+ val orderedChildOutput = orderedIndices.map(childOutput)
+ val orderedOriginalAttributes =
orderedIndices.map(originalInputAttributes)
+
+ (
+ RelBuilder.makeProjectRel(
+ input,
+ new java.util.ArrayList[ExpressionNode]((selectCols).asJava),
+ createExtensionNode(originalInputAttributes, validation),
+ context,
+ operatorId,
+ originalInputAttributes.size
+ ),
+ orderedTypeNodesList,
+ orderedChildOutput,
+ orderedOriginalAttributes)
+ } else {
+ // If a preproject is not needed, return the original values
+ (input, typeNodes, childOutput, originalInputAttributes)
+ }
+ }
+
def getRelNode(
context: SubstraitContext,
originalInputAttributes: Seq[Attribute],
@@ -81,8 +164,20 @@ case class WriteFilesExecTransformer(
val inputAttributes = new java.util.ArrayList[Attribute]()
val childSize = this.child.output.size
val childOutput = this.child.output
+
+ val (inputRelNode, orderedTypeNodes, orderedChildOutput,
orderedOriginalInputAttributes) =
+ createPreProjectionIfNeeded(
+ context,
+ originalInputAttributes,
+ operatorId,
+ input,
+ validation,
+ typeNodes,
+ childOutput
+ )
+
for (i <- 0 until childSize) {
- val partitionCol = partitionColumns.find(_.exprId ==
childOutput(i).exprId)
+ val partitionCol = partitionColumns.find(_.exprId ==
orderedChildOutput(i).exprId)
if (partitionCol.nonEmpty) {
columnTypeNodes.add(new
ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL))
// "aggregate with partition group by can be pushed down"
@@ -91,22 +186,14 @@ case class WriteFilesExecTransformer(
inputAttributes.add(partitionCol.get)
} else {
columnTypeNodes.add(new
ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL))
- inputAttributes.add(originalInputAttributes(i))
+ inputAttributes.add(orderedOriginalInputAttributes(i))
}
}
val nameList =
ConverterUtils.collectAttributeNames(inputAttributes.toSeq)
- val extensionNode = if (!validation) {
- ExtensionBuilder.makeAdvancedExtension(
- BackendsApiManager.getTransformerApiInstance.genWriteParameters(this),
- SubstraitUtil.createEnhancement(originalInputAttributes)
- )
- } else {
- // Use an extension node to send the input types through Substrait plan
for validation.
- ExtensionBuilder.makeAdvancedExtension(
- SubstraitUtil.createEnhancement(originalInputAttributes))
- }
+
+ val extensionNode = createExtensionNode(orderedOriginalInputAttributes,
validation)
val bucketSpecOption = bucketSpec.map {
bucketSpec =>
@@ -118,8 +205,8 @@ case class WriteFilesExecTransformer(
}
RelBuilder.makeWriteRel(
- input,
- typeNodes,
+ inputRelNode,
+ orderedTypeNodes,
nameList,
columnTypeNodes,
extensionNode,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]