This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new bd26797c8e6 [HUDI-7809] Use Spark SerializableConfiguration to avoid 
NPE in Kryo serde (#11356)
bd26797c8e6 is described below

commit bd26797c8e651694b31bdf614f55c9fc6a757f71
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 29 07:53:40 2024 -0700

    [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde 
(#11356)
    
    * [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde
    
    * Revert changes in HoodieBaseRelation
---
 .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java    |  7 +++----
 .../parquet/Spark30LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 .../parquet/Spark31LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 .../parquet/Spark32LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 .../parquet/Spark33LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 .../parquet/Spark34LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 .../parquet/Spark35LegacyHoodieParquetFileFormat.scala       | 12 ++++++------
 7 files changed, 39 insertions(+), 40 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 357200f5f0e..269a83bf7ac 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -21,9 +21,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.util.JavaScalaConverters;
@@ -51,6 +49,7 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
 
 import javax.annotation.Nonnull;
 
@@ -163,7 +162,7 @@ public class ColumnStatsIndexHelper {
         .map(StructField::name)
         .collect(Collectors.toList());
 
-    StorageConfiguration<?> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration());
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sc.hadoopConfiguration());
     int numParallelism = (baseFilesPaths.size() / 3 + 1);
 
     String previousJobDescription = 
sc.getLocalProperty("spark.job.description");
@@ -178,7 +177,7 @@ public class ColumnStatsIndexHelper {
                 Iterable<String> iterable = () -> paths;
                 return StreamSupport.stream(iterable.spliterator(), false)
                     .flatMap(path -> {
-                      HoodieStorage storage = new HoodieHadoopStorage(path, 
storageConf);
+                      HoodieStorage storage = new HoodieHadoopStorage(path, 
serializableConfiguration.value());
                           return utils.readColumnStatsFromMetadata(
                                   storage,
                                   new StoragePath(path),
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
index bf6e222b763..59fde4af02f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
 
@@ -108,8 +108,8 @@ class Spark30LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -147,7 +147,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
           Array.empty,
           null)
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -160,7 +160,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(
           commitInstantTime, tablePath, storage, if (validCommits == null) "" 
else validCommits)
       } else {
@@ -223,7 +223,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
 
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
index aa1b798241c..729ba95b644 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
 
@@ -108,8 +108,8 @@ class Spark31LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -147,7 +147,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
           Array.empty,
           null)
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -160,7 +160,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(
           commitInstantTime, tablePath, storage, if (validCommits == null) "" 
else validCommits)
       } else {
@@ -227,7 +227,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = new 
Configuration(broadcastedStorageConf.value.unwrap)
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
 
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
index 44d420c7501..68188c3fbf0 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
 
@@ -111,8 +111,8 @@ class Spark32LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -146,7 +146,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val filePath = new Path(new URI(file.filePath))
       val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -159,7 +159,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(
           commitInstantTime, tablePath, storage, if (validCommits == null) "" 
else validCommits)
       } else {
@@ -228,7 +228,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index d39d12b3fe2..2e779100df3 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -51,6 +50,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
@@ -114,8 +114,8 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -148,7 +148,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val filePath = new Path(new URI(file.filePath))
       val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -161,7 +161,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(
           commitInstantTime, tablePath, storage, if (validCommits == null) "" 
else validCommits)
       } else {
@@ -230,7 +230,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 8818cb5672f..995ef165fc4 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 
@@ -124,8 +124,8 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -160,7 +160,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val filePath = file.filePath.toPath
       val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -173,7 +173,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, storage, if (validCommits == null) "" else validCommits)
       } else {
         null
@@ -241,7 +241,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index 6286a19f080..e1a3dc1427d 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -50,6 +49,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 
@@ -125,8 +125,8 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
     }
 
-    val broadcastedStorageConf =
-      
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -161,7 +161,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val filePath = file.filePath.toPath
       val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-      val sharedConf = broadcastedStorageConf.value.unwrap
+      val sharedConf = broadcastedHadoopConf.value.value
 
       // Fetch internal schema
       val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -174,7 +174,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val fileSchema = if (shouldUseInternalSchema) {
         val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-        val storage = new HoodieHadoopStorage(tablePath, 
broadcastedStorageConf.value)
+        val storage = new HoodieHadoopStorage(tablePath, sharedConf)
         InternalSchemaCache.getInternalSchemaByVersionId(
           commitInstantTime, tablePath, storage, if (validCommits == null) "" 
else validCommits)
       } else {
@@ -243,7 +243,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
 
       // Clone new conf
-      val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)

Reply via email to