This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d27959c1d2 [GLUTEN-9152][CORE] Avoid unnecessary serialization of 
hadoop conf (#9153)
d27959c1d2 is described below

commit d27959c1d2c36a511535976efd0fbc9dbcdd1d7e
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Apr 8 18:02:39 2025 +0800

    [GLUTEN-9152][CORE] Avoid unnecessary serialization of hadoop conf (#9153)
---
 .../apache/gluten/backendsapi/clickhouse/CHBackend.scala   |  5 +++--
 .../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 10 +++-------
 .../org/apache/gluten/utils/ParquetMetadataUtils.scala     | 14 +++++---------
 .../org/apache/gluten/backendsapi/BackendSettingsApi.scala |  5 +++--
 .../apache/gluten/execution/BasicScanExecTransformer.scala |  6 +-----
 .../apache/gluten/execution/WholeStageTransformer.scala    |  8 ++------
 6 files changed, 17 insertions(+), 31 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index d71d41bf2d..dcdb5dcc5d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -44,7 +44,8 @@ import 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
 
 import java.util.Locale
 
@@ -177,7 +178,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       fields: Array[StructField],
       rootPaths: Seq[String],
       properties: Map[String, String],
-      serializableHadoopConf: Option[SerializableConfiguration] = None): 
ValidationResult = {
+      hadoopConf: Configuration): ValidationResult = {
 
     // Validate if all types are supported.
     def hasComplexType: Boolean = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 23fec4bbdc..b194e2183e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -48,8 +48,8 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
 import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import scala.util.control.Breaks.breakable
@@ -103,7 +103,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
       fields: Array[StructField],
       rootPaths: Seq[String],
       properties: Map[String, String],
-      serializableHadoopConf: Option[SerializableConfiguration] = None): 
ValidationResult = {
+      hadoopConf: Configuration): ValidationResult = {
 
     def validateScheme(): Option[String] = {
       val filteredRootPaths = distinctRootPaths(rootPaths)
@@ -187,11 +187,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
       val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
       val encryptionResult =
-        ParquetMetadataUtils.validateEncryption(
-          format,
-          rootPaths,
-          serializableHadoopConf,
-          fileLimit)
+        ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, 
fileLimit)
       if (encryptionResult.ok()) {
         None
       } else {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 48d0629268..eb917a4772 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -21,8 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
 
-import org.apache.spark.util.SerializableConfiguration
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, 
RemoteIterator}
 
@@ -38,29 +36,27 @@ object ParquetMetadataUtils {
    *   File format, e.g., `ParquetReadFormat`
    * @param rootPaths
    *   List of file paths to scan
-   * @param serializableHadoopConf
-   *   Optional Hadoop configuration
+   * @param hadoopConf
+   *   Hadoop configuration
    * @return
    *   [[ValidationResult]] validation success or failure
    */
   def validateEncryption(
       format: ReadFileFormat,
       rootPaths: Seq[String],
-      serializableHadoopConf: Option[SerializableConfiguration],
+      hadoopConf: Configuration,
       fileLimit: Int
   ): ValidationResult = {
     if (format != ParquetReadFormat || rootPaths.isEmpty) {
       return ValidationResult.succeeded
     }
 
-    val conf = serializableHadoopConf.map(_.value).getOrElse(new 
Configuration())
-
     rootPaths.foreach {
       rootPath =>
-        val fs = new Path(rootPath).getFileSystem(conf)
+        val fs = new Path(rootPath).getFileSystem(hadoopConf)
         try {
           val encryptionDetected =
-            checkForEncryptionWithLimit(fs, new Path(rootPath), conf, 
fileLimit = fileLimit)
+            checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf, 
fileLimit = fileLimit)
           if (encryptionDetected) {
             return ValidationResult.failed("Encrypted Parquet file detected.")
           }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 937d503c1f..c9a3fdbc6d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.connector.read.Scan
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
 
 trait BackendSettingsApi {
 
@@ -41,7 +42,7 @@ trait BackendSettingsApi {
       fields: Array[StructField],
       rootPaths: Seq[String],
       properties: Map[String, String],
-      serializableHadoopConf: Option[SerializableConfiguration] = None): 
ValidationResult =
+      hadoopConf: Configuration): ValidationResult =
     ValidationResult.succeeded
 
   def getSubstraitReadFileFormatV1(fileFormat: FileFormat): 
LocalFilesNode.ReadFileFormat
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index b5d3c03fc6..056c35a527 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
 
 import com.google.protobuf.StringValue
 import io.substrait.proto.NamedStruct
@@ -78,9 +77,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
           getProperties))
   }
 
-  val serializableHadoopConf: SerializableConfiguration = new 
SerializableConfiguration(
-    sparkContext.hadoopConfiguration)
-
   override protected def doValidateInternal(): ValidationResult = {
     var fields = schema.fields
 
@@ -100,7 +96,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
         fields,
         getRootFilePaths,
         getProperties,
-        Some(serializableHadoopConf))
+        sparkContext.hadoopConfiguration)
     if (!validationResult.ok()) {
       return validationResult
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index b1b0f3ddfc..df29110426 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
 
 import com.google.common.collect.Lists
 import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
@@ -236,9 +235,6 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   val sparkConf: SparkConf = sparkContext.getConf
 
-  val serializableHadoopConf: SerializableConfiguration = new 
SerializableConfiguration(
-    sparkContext.hadoopConfiguration)
-
   val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo
 
   @transient
@@ -400,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
               val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
                 splitInfo.getPaths.asScala.toSeq,
                 viewfsToHdfsCache,
-                serializableHadoopConf.value)
+                sparkContext.hadoopConfiguration)
               splitInfo.setPaths(newPaths.asJava)
           }
       }
@@ -463,7 +459,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
           val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
             splitInfo.getPaths.asScala.toSeq,
             viewfsToHdfsCache,
-            serializableHadoopConf.value)
+            sparkContext.hadoopConfiguration)
           splitInfo.setPaths(newPaths.asJava)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to