This is an automated email from the ASF dual-hosted git repository.
mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d27959c1d2 [GLUTEN-9152][CORE] Avoid unnecessary serialization of
hadoop conf (#9153)
d27959c1d2 is described below
commit d27959c1d2c36a511535976efd0fbc9dbcdd1d7e
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Apr 8 18:02:39 2025 +0800
[GLUTEN-9152][CORE] Avoid unnecessary serialization of hadoop conf (#9153)
---
.../apache/gluten/backendsapi/clickhouse/CHBackend.scala | 5 +++--
.../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 10 +++-------
.../org/apache/gluten/utils/ParquetMetadataUtils.scala | 14 +++++---------
.../org/apache/gluten/backendsapi/BackendSettingsApi.scala | 5 +++--
.../apache/gluten/execution/BasicScanExecTransformer.scala | 6 +-----
.../apache/gluten/execution/WholeStageTransformer.scala | 8 ++------
6 files changed, 17 insertions(+), 31 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index d71d41bf2d..dcdb5dcc5d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -44,7 +44,8 @@ import
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
import java.util.Locale
@@ -177,7 +178,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
- serializableHadoopConf: Option[SerializableConfiguration] = None):
ValidationResult = {
+ hadoopConf: Configuration): ValidationResult = {
// Validate if all types are supported.
def hasComplexType: Boolean = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 23fec4bbdc..b194e2183e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -48,8 +48,8 @@ import
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import scala.util.control.Breaks.breakable
@@ -103,7 +103,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
- serializableHadoopConf: Option[SerializableConfiguration] = None):
ValidationResult = {
+ hadoopConf: Configuration): ValidationResult = {
def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
@@ -187,11 +187,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
val encryptionResult =
- ParquetMetadataUtils.validateEncryption(
- format,
- rootPaths,
- serializableHadoopConf,
- fileLimit)
+ ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf,
fileLimit)
if (encryptionResult.ok()) {
None
} else {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 48d0629268..eb917a4772 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -21,8 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
-import org.apache.spark.util.SerializableConfiguration
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path,
RemoteIterator}
@@ -38,29 +36,27 @@ object ParquetMetadataUtils {
* File format, e.g., `ParquetReadFormat`
* @param rootPaths
* List of file paths to scan
- * @param serializableHadoopConf
- * Optional Hadoop configuration
+ * @param hadoopConf
+ * Hadoop configuration
* @return
* [[ValidationResult]] validation success or failure
*/
def validateEncryption(
format: ReadFileFormat,
rootPaths: Seq[String],
- serializableHadoopConf: Option[SerializableConfiguration],
+ hadoopConf: Configuration,
fileLimit: Int
): ValidationResult = {
if (format != ParquetReadFormat || rootPaths.isEmpty) {
return ValidationResult.succeeded
}
- val conf = serializableHadoopConf.map(_.value).getOrElse(new
Configuration())
-
rootPaths.foreach {
rootPath =>
- val fs = new Path(rootPath).getFileSystem(conf)
+ val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
val encryptionDetected =
- checkForEncryptionWithLimit(fs, new Path(rootPath), conf,
fileLimit = fileLimit)
+ checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf,
fileLimit = fileLimit)
if (encryptionDetected) {
return ValidationResult.failed("Encrypted Parquet file detected.")
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 937d503c1f..c9a3fdbc6d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.connector.read.Scan
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
trait BackendSettingsApi {
@@ -41,7 +42,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
- serializableHadoopConf: Option[SerializableConfiguration] = None):
ValidationResult =
+ hadoopConf: Configuration): ValidationResult =
ValidationResult.succeeded
def getSubstraitReadFileFormatV1(fileFormat: FileFormat):
LocalFilesNode.ReadFileFormat
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index b5d3c03fc6..056c35a527 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
@@ -78,9 +77,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
getProperties))
}
- val serializableHadoopConf: SerializableConfiguration = new
SerializableConfiguration(
- sparkContext.hadoopConfiguration)
-
override protected def doValidateInternal(): ValidationResult = {
var fields = schema.fields
@@ -100,7 +96,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
fields,
getRootFilePaths,
getProperties,
- Some(serializableHadoopConf))
+ sparkContext.hadoopConfiguration)
if (!validationResult.ok()) {
return validationResult
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index b1b0f3ddfc..df29110426 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
import com.google.common.collect.Lists
import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
@@ -236,9 +235,6 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val sparkConf: SparkConf = sparkContext.getConf
- val serializableHadoopConf: SerializableConfiguration = new
SerializableConfiguration(
- sparkContext.hadoopConfiguration)
-
val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo
@transient
@@ -400,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
splitInfo.getPaths.asScala.toSeq,
viewfsToHdfsCache,
- serializableHadoopConf.value)
+ sparkContext.hadoopConfiguration)
splitInfo.setPaths(newPaths.asJava)
}
}
@@ -463,7 +459,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
splitInfo.getPaths.asScala.toSeq,
viewfsToHdfsCache,
- serializableHadoopConf.value)
+ sparkContext.hadoopConfiguration)
splitInfo.setPaths(newPaths.asJava)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]