This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 88d057f75bd8497b489531991a8c0570cb61a8bc Author: Y Ethan Guo <[email protected]> AuthorDate: Wed May 29 07:53:40 2024 -0700 [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11356) * [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde * Revert changes in HoodieBaseRelation --- .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java | 7 +++---- .../parquet/Spark30LegacyHoodieParquetFileFormat.scala | 12 ++++++------ .../parquet/Spark31LegacyHoodieParquetFileFormat.scala | 12 ++++++------ .../parquet/Spark32LegacyHoodieParquetFileFormat.scala | 12 ++++++------ .../parquet/Spark33LegacyHoodieParquetFileFormat.scala | 12 ++++++------ .../parquet/Spark34LegacyHoodieParquetFileFormat.scala | 12 ++++++------ .../parquet/Spark35LegacyHoodieParquetFileFormat.scala | 12 ++++++------ 7 files changed, 39 insertions(+), 40 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java index 357200f5f0e..269a83bf7ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java @@ -21,9 +21,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hudi.util.JavaScalaConverters; @@ -51,6 +49,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.util.SerializableConfiguration; import javax.annotation.Nonnull; @@ -163,7 +162,7 @@ public class ColumnStatsIndexHelper { .map(StructField::name) .collect(Collectors.toList()); - StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration()); + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); int numParallelism = (baseFilesPaths.size() / 3 + 1); String previousJobDescription = sc.getLocalProperty("spark.job.description"); @@ -178,7 +177,7 @@ public class ColumnStatsIndexHelper { Iterable<String> iterable = () -> paths; return StreamSupport.stream(iterable.spliterator(), false) .flatMap(path -> { - HoodieStorage storage = new HoodieHadoopStorage(path, storageConf); + HoodieStorage storage = new HoodieHadoopStorage(path, serializableConfiguration.value()); return utils.readColumnStatsFromMetadata( storage, new StoragePath(path), diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala index bf6e222b763..59fde4af02f 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import java.net.URI @@ -108,8 +108,8 @@ class Spark30LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -147,7 +147,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu Array.empty, null) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -160,7 +160,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { @@ -223,7 +223,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala index aa1b798241c..729ba95b644 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import java.net.URI @@ -108,8 +108,8 @@ class Spark31LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -147,7 +147,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu Array.empty, null) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -160,7 +160,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { @@ -227,7 +227,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = new Configuration(broadcastedStorageConf.value.unwrap) + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala index 44d420c7501..68188c3fbf0 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import java.net.URI @@ -111,8 +111,8 @@ class Spark32LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -146,7 +146,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val filePath = new Path(new URI(file.filePath)) val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -159,7 +159,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { @@ -228,7 +228,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index d39d12b3fe2..2e779100df3 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -51,6 +50,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import java.net.URI import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` @@ -114,8 +114,8 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -148,7 +148,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val filePath = new Path(new URI(file.filePath)) val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -161,7 +161,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { @@ -230,7 +230,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 8818cb5672f..995ef165fc4 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` @@ -124,8 +124,8 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -160,7 +160,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val filePath = file.filePath.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -173,7 +173,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { null @@ -241,7 +241,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala index 6286a19f080..e1a3dc1427d 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -50,6 +49,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` @@ -125,8 +125,8 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - val broadcastedStorageConf = - sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -161,7 +161,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val filePath = file.filePath.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = broadcastedStorageConf.value.unwrap + val sharedConf = broadcastedHadoopConf.value.value // Fetch internal schema val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) @@ -174,7 +174,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val fileSchema = if (shouldUseInternalSchema) { val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - val storage = new HoodieHadoopStorage(tablePath, broadcastedStorageConf.value) + val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits) } else { @@ -243,7 +243,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf - val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
