This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new bd26797c8e6 [HUDI-7809] Use Spark SerializableConfiguration to avoid
NPE in Kryo serde (#11356)
bd26797c8e6 is described below
commit bd26797c8e651694b31bdf614f55c9fc6a757f71
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 29 07:53:40 2024 -0700
[HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde
(#11356)
* [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde
* Revert changes in HoodieBaseRelation
---
.../test/java/org/apache/hudi/ColumnStatsIndexHelper.java | 7 +++----
.../parquet/Spark30LegacyHoodieParquetFileFormat.scala | 12 ++++++------
.../parquet/Spark31LegacyHoodieParquetFileFormat.scala | 12 ++++++------
.../parquet/Spark32LegacyHoodieParquetFileFormat.scala | 12 ++++++------
.../parquet/Spark33LegacyHoodieParquetFileFormat.scala | 12 ++++++------
.../parquet/Spark34LegacyHoodieParquetFileFormat.scala | 12 ++++++------
.../parquet/Spark35LegacyHoodieParquetFileFormat.scala | 12 ++++++------
7 files changed, 39 insertions(+), 40 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 357200f5f0e..269a83bf7ac 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -21,9 +21,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.util.JavaScalaConverters;
@@ -51,6 +49,7 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
import javax.annotation.Nonnull;
@@ -163,7 +162,7 @@ public class ColumnStatsIndexHelper {
.map(StructField::name)
.collect(Collectors.toList());
- StorageConfiguration<?> storageConf =
HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration());
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (baseFilesPaths.size() / 3 + 1);
String previousJobDescription =
sc.getLocalProperty("spark.job.description");
@@ -178,7 +177,7 @@ public class ColumnStatsIndexHelper {
Iterable<String> iterable = () -> paths;
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path -> {
- HoodieStorage storage = new HoodieHadoopStorage(path,
storageConf);
+ HoodieStorage storage = new HoodieHadoopStorage(path,
serializableConfiguration.value());
return utils.readColumnStatsFromMetadata(
storage,
new StoragePath(path),
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
index bf6e222b763..59fde4af02f 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import java.net.URI
@@ -108,8 +108,8 @@ class Spark30LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -147,7 +147,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
Array.empty,
null)
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -160,7 +160,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits)
} else {
@@ -223,7 +223,7 @@ class Spark30LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
index aa1b798241c..729ba95b644 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import java.net.URI
@@ -108,8 +108,8 @@ class Spark31LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -147,7 +147,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
Array.empty,
null)
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -160,7 +160,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits)
} else {
@@ -227,7 +227,7 @@ class Spark31LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = new
Configuration(broadcastedStorageConf.value.unwrap)
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
index 44d420c7501..68188c3fbf0 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import java.net.URI
@@ -111,8 +111,8 @@ class Spark32LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -146,7 +146,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -159,7 +159,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits)
} else {
@@ -228,7 +228,7 @@ class Spark32LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index d39d12b3fe2..2e779100df3 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -51,6 +50,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import java.net.URI
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
@@ -114,8 +114,8 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -148,7 +148,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -161,7 +161,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits)
} else {
@@ -230,7 +230,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 8818cb5672f..995ef165fc4 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
@@ -124,8 +124,8 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -160,7 +160,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val filePath = file.filePath.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -173,7 +173,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, storage, if (validCommits == null) "" else validCommits)
} else {
null
@@ -241,7 +241,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index 6286a19f080..e1a3dc1427d 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -50,6 +49,7 @@ import
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
@@ -125,8 +125,8 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
prunedInternalSchemaStr)
}
- val broadcastedStorageConf =
-
sparkSession.sparkContext.broadcast(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default
values.
// If true, enable using the custom RecordReader for parquet. This only
works for
@@ -161,7 +161,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val filePath = file.filePath.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val sharedConf = broadcastedStorageConf.value.unwrap
+ val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
@@ -174,7 +174,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
- val storage = new HoodieHadoopStorage(tablePath,
broadcastedStorageConf.value)
+ val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits)
} else {
@@ -243,7 +243,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
// Clone new conf
- val hadoopAttemptConf = broadcastedStorageConf.value.unwrapCopy
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)