This is an automated email from the ASF dual-hosted git repository.

FelixYBW pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bf094200b [GLUTEN-12071][VL] Respect HadoopConf write options in 
Velox native Parquet writer (#12072)
3bf094200b is described below

commit 3bf094200be98eaaedd6efef4f0af567dedbc6f3
Author: Wechar Yu <[email protected]>
AuthorDate: Fri May 15 13:22:03 2026 +0800

    [GLUTEN-12071][VL] Respect HadoopConf write options in Velox native Parquet 
writer (#12072)
    
    updates Velox native Parquet write parameter generation to build write 
options
---
 .../backendsapi/velox/VeloxTransformerApi.scala    | 13 +++++-
 .../sql/execution/VeloxParquetWriteSuite.scala     | 54 ++++++++++++++++++++++
 .../datasources/parquet/ParquetFileFormat.scala    | 13 ++++--
 3 files changed, 75 insertions(+), 5 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index 3a1d53154f..47ad778dc8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -32,6 +32,7 @@ import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.execution.HiveFileFormat
@@ -44,6 +45,8 @@ import org.apache.commons.lang3.math.NumberUtils
 
 import java.util.{Map => JMap}
 
+import scala.collection.JavaConverters._
+
 class VeloxTransformerApi extends TransformerApi with Logging {
 
   def genPartitionSeq(
@@ -131,11 +134,17 @@ class VeloxTransformerApi extends TransformerApi with 
Logging {
         // Only Parquet is supported. It's safe to set a fixed "parquet" here
         // because others already fell back by WriteFilesExecTransformer's 
validation.
         val shortName = "parquet"
+        val writeOptions = Option(write.session).map {
+          session =>
+            val hadoopConf = 
session.sessionState.newHadoopConfWithOptions(write.options)
+            CaseInsensitiveMap(hadoopConf.iterator().asScala.map(
+              entry => entry.getKey -> entry.getValue).toMap)
+        }.getOrElse(write.caseInsensitiveOptions)
         val nativeConf =
           GlutenFormatFactory(shortName)
             .nativeConf(
-              write.caseInsensitiveOptions,
-              
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
+              writeOptions,
+              WriteFilesExecTransformer.getCompressionCodec(writeOptions))
         packPBMessage(
           ConfigMap
             .newBuilder()
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index a4577c8a5b..9de24103b0 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -27,6 +27,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 
+import java.io.File
+
+import scala.collection.JavaConverters._
+
 class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with 
WriteUtils {
 
   override protected val resourcePath: String = "/tpch-data-parquet"
@@ -267,3 +271,53 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite with WriteU
     }
   }
 }
+
+class VeloxParquetWriteHadoopConfSuite extends VeloxWholeStageTransformerSuite 
with WriteUtils {
+
+  override protected val resourcePath: String = ""
+  override protected val fileFormat: String = "parquet"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+      .set(s"spark.hadoop.parquet.enable.dictionary", "false")
+  }
+
+  private val dictionaryEncodingNames = Set("PLAIN_DICTIONARY", 
"RLE_DICTIONARY")
+
+  private def parquetColumnEncodings(dir: File): Seq[Set[String]] = {
+    val parquetFiles = dir.list((_, name) => name.contains("parquet"))
+    assert(parquetFiles.nonEmpty)
+    parquetFiles.flatMap {
+      file =>
+        val path = new Path(dir.getAbsolutePath, file)
+        val in = HadoopInputFile.fromPath(path, 
spark.sessionState.newHadoopConf())
+        Utils.tryWithResource(ParquetFileReader.open(in)) {
+          reader =>
+            reader.getFooter.getBlocks.asScala.flatMap {
+              block => 
block.getColumns.asScala.map(_.getEncodings.asScala.map(_.name()).toSet)
+            }
+        }
+    }.toSeq
+  }
+
+  test("native writer should respect parquet dictionary config from 
spark.hadoop config") {
+    spark
+      .range(0, 20000, 1, 1)
+      .selectExpr("concat('gluten-parquet-dictionary-', CAST(id % 10 AS 
STRING)) AS payload")
+      .createOrReplaceTempView("parquet_dictionary_source")
+
+    withTempPath {
+      hadoopConfDir =>
+        checkNativeWrite(
+          s"""
+             |INSERT OVERWRITE DIRECTORY USING PARQUET
+             |OPTIONS ('path' '${hadoopConfDir.getCanonicalPath}')
+             |SELECT * FROM parquet_dictionary_source
+             |""".stripMargin)
+        val columnEncodings = parquetColumnEncodings(hadoopConfDir)
+        assert(columnEncodings.nonEmpty)
+        
assert(!columnEncodings.exists(_.exists(dictionaryEncodingNames.contains)))
+    }
+  }
+}
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 61864d01d9..a534f88081 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
@@ -77,11 +77,18 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
     if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == 
"true") {
       // Pass compression to job conf so that the file extension can be aware 
of it.
       val conf = ContextUtil.getConfiguration(job)
-      val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
+      val writeOptions = CaseInsensitiveMap(
+        sparkSession.sessionState
+          .newHadoopConfWithOptions(options)
+          .iterator()
+          .asScala
+          .map(entry => entry.getKey -> entry.getValue)
+          .toMap)
+      val parquetOptions = new ParquetOptions(writeOptions, 
sparkSession.sessionState.conf)
       conf.set(ParquetOutputFormat.COMPRESSION, 
parquetOptions.compressionCodecClassName)
       val nativeConf =
         GlutenFormatFactory(shortName())
-          .nativeConf(options, parquetOptions.compressionCodecClassName)
+          .nativeConf(writeOptions, parquetOptions.compressionCodecClassName)
 
       new OutputWriterFactory {
         override def getFileExtension(context: TaskAttemptContext): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to