This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 54700538ab2 [HUDI-8011] allow schema.on.read with positional merging
(#11671)
54700538ab2 is described below
commit 54700538ab2fb7c0041b461fd16048713a09db22
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jul 23 21:34:24 2024 -0400
[HUDI-8011] allow schema.on.read with positional merging (#11671)
* allow schema.on.read with positional merging
* use full name so builds
---------
Co-authored-by: Jonathan Vexler <=>
---
.../SparkFileFormatInternalRowReaderContext.scala | 3 ++-
.../datasources/parquet/SparkParquetReader.scala | 14 ++++++----
.../read/HoodieFileGroupReaderSchemaHandler.java | 18 +++++++++++--
.../read/HoodiePositionBasedSchemaHandler.java | 27 ++++++++++++++++++-
.../hudi/internal/schema/InternalSchema.java | 30 ++++++++++-----------
...odieFileGroupReaderBasedParquetFileFormat.scala | 6 ++---
.../parquet/SparkParquetReaderBase.scala | 31 +++++++++++++---------
.../parquet/TestSparkParquetReaderFormat.scala | 3 ++-
.../TestSpark35RecordPositionMetadataColumn.scala | 3 ++-
.../datasources/parquet/Spark24ParquetReader.scala | 4 +++
.../Spark3ParquetSchemaEvolutionUtils.scala | 25 +++++++----------
.../datasources/parquet/Spark30ParquetReader.scala | 18 ++++++++-----
.../Spark30ParquetSchemaEvolutionUtils.scala | 8 ++++--
.../datasources/parquet/Spark31ParquetReader.scala | 20 +++++++++-----
.../Spark31ParquetSchemaEvolutionUtils.scala | 8 ++++--
.../datasources/parquet/Spark32ParquetReader.scala | 18 ++++++++-----
.../Spark32PlusParquetSchemaEvolutionUtils.scala | 10 ++++---
.../datasources/parquet/Spark33ParquetReader.scala | 17 +++++++-----
.../datasources/parquet/Spark34ParquetReader.scala | 18 ++++++++-----
.../datasources/parquet/Spark35ParquetReader.scala | 28 +++++++++++--------
20 files changed, 200 insertions(+), 109 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 074e41b6fef..89e4b7b1040 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -103,7 +103,8 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
.createPartitionedFile(InternalRow.empty, filePath, start, length)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
new CloseableInternalRowIterator(parquetFileReader.read(fileInfo,
- readSchema, StructType(Seq.empty), readFilters,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+ readSchema, StructType(Seq.empty),
getSchemaHandler.getInternalSchemaOpt,
+ readFilters,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
index d1f6826a2e1..8188f1d0a19 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
@@ -19,6 +19,8 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hadoop.conf.Configuration
@@ -31,16 +33,18 @@ trait SparkParquetReader extends Serializable {
/**
* Read an individual parquet file
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param storageConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow]
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 8a962fdc2a7..46d2fdd0c5e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -58,6 +58,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
protected final InternalSchema internalSchema;
+ protected final Option<InternalSchema> internalSchemaOpt;
protected final HoodieTableConfig hoodieTableConfig;
@@ -84,6 +85,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
this.hoodieTableConfig = hoodieTableConfig;
this.requiredSchema = prepareRequiredSchema();
this.internalSchema = pruneInternalSchema(requiredSchema,
internalSchemaOpt);
+ this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
}
@@ -103,6 +105,10 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return this.internalSchema;
}
+ public Option<InternalSchema> getInternalSchemaOpt() {
+ return this.internalSchemaOpt;
+ }
+
public Option<UnaryOperator<T>> getOutputConverter() {
if (!requestedSchema.equals(requiredSchema)) {
return Option.of(readerContext.projectRecord(requiredSchema,
requestedSchema));
@@ -110,7 +116,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return Option.empty();
}
- private static InternalSchema pruneInternalSchema(Schema requiredSchema,
Option<InternalSchema> internalSchemaOption) {
+ private InternalSchema pruneInternalSchema(Schema requiredSchema,
Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
}
@@ -119,7 +125,15 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return InternalSchema.getEmptyInternalSchema();
}
- return
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema,
notPruned);
+ return doPruneInternalSchema(requiredSchema, notPruned);
+ }
+
+ protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema>
internalSchemaOpt) {
+ return internalSchemaOpt;
+ }
+
+ protected InternalSchema doPruneInternalSchema(Schema requiredSchema,
InternalSchema internalSchema) {
+ return
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema,
internalSchema);
}
private Schema generateRequiredSchema() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
index 87c4266e350..2336fb188be 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.apache.avro.Schema;
@@ -31,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import static
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
+import static
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
/**
* This class is responsible for handling the schema for the file group reader
that supports positional merge.
@@ -52,6 +56,27 @@ public class HoodiePositionBasedSchemaHandler<T> extends
HoodieFileGroupReaderSc
: preMergeSchema;
}
+ @Override
+ protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema>
internalSchemaOpt) {
+ return
internalSchemaOpt.map(HoodiePositionBasedSchemaHandler::addPositionalMergeCol);
+ }
+
+ @Override
+ protected InternalSchema doPruneInternalSchema(Schema requiredSchema,
InternalSchema internalSchema) {
+ if (!(readerContext.getShouldMergeUseRecordPosition() &&
readerContext.getHasLogFiles())) {
+ return super.doPruneInternalSchema(requiredSchema, internalSchema);
+ }
+
+ InternalSchema withRowIndex = addPositionalMergeCol(internalSchema);
+ return super.doPruneInternalSchema(requiredSchema, withRowIndex);
+ }
+
+ private static InternalSchema addPositionalMergeCol(InternalSchema
internalSchema) {
+ TableChanges.ColumnAddChange addChange =
TableChanges.ColumnAddChange.get(internalSchema);
+ addChange.addColumns("", ROW_INDEX_TEMPORARY_COLUMN_NAME,
Types.LongType.get(), null);
+ return SchemaChangeUtils.applyTableChanges2Schema(internalSchema,
addChange);
+ }
+
@Override
public Pair<List<Schema.Field>,List<Schema.Field>>
getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols =
super.getBootstrapRequiredFields();
@@ -69,7 +94,7 @@ public class HoodiePositionBasedSchemaHandler<T> extends
HoodieFileGroupReaderSc
}
private static Schema.Field getPositionalMergeField() {
- return new
Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME,
+ return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME,
Schema.create(Schema.Type.LONG), "", -1L);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
index d1d36fcbc91..5df9cd6f22e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
@@ -71,15 +71,13 @@ public class InternalSchema implements Serializable {
this.maxColumnId = maxColumnId;
this.versionId = versionId;
this.record = recordType;
- buildIdToName();
+ getIdToName();
}
public InternalSchema(long versionId, RecordType recordType) {
this.versionId = versionId;
this.record = recordType;
- this.idToName = recordType.fields().isEmpty()
- ? Collections.emptyMap()
- : InternalSchemaBuilder.getBuilder().buildIdToName(record);
+ this.idToName = buildIdToName(record);
this.nameToId = recordType.fields().isEmpty()
? Collections.emptyMap()
:
idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue,
Map.Entry::getKey));
@@ -90,9 +88,15 @@ public class InternalSchema implements Serializable {
return record;
}
- private Map<Integer, String> buildIdToName() {
+ private static Map<Integer, String> buildIdToName(RecordType record) {
+ return record.fields().isEmpty()
+ ? Collections.emptyMap()
+ : InternalSchemaBuilder.getBuilder().buildIdToName(record);
+ }
+
+ private Map<Integer, String> getIdToName() {
if (idToName == null) {
- idToName = InternalSchemaBuilder.getBuilder().buildIdToName(record);
+ idToName = buildIdToName(record);
}
return idToName;
}
@@ -168,10 +172,7 @@ public class InternalSchema implements Serializable {
* @return full name of field corresponding to id
*/
public String findFullName(int id) {
- if (idToName == null) {
- buildIdToName();
- }
- String result = idToName.get(id);
+ String result = getIdToName().get(id);
return result == null ? "" : result;
}
@@ -210,10 +211,7 @@ public class InternalSchema implements Serializable {
* Returns all field ids
*/
public Set<Integer> getAllIds() {
- if (idToName == null) {
- buildIdToName();
- }
- return idToName.keySet();
+ return getIdToName().keySet();
}
/**
@@ -255,9 +253,9 @@ public class InternalSchema implements Serializable {
if (caseSensitive) {
// In case we do a case-sensitive check we just need to validate whether
// schema contains field-name as it is
- return idToName.containsValue(colName);
+ return getIdToName().containsValue(colName);
} else {
- return idToName.values()
+ return getIdToName().values()
.stream()
.map(fieldName -> fieldName.toLowerCase(Locale.ROOT))
.collect(Collectors.toSet())
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 31404c3e786..9c9a4cca7be 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -29,7 +29,6 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration,
HoodieHadoopStorage}
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
@@ -179,7 +178,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
fileSliceMapping.getPartitionValues)
}
- case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, filters, storageConf)
+ case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, internalSchemaOpt, filters, storageConf)
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping =>
@@ -189,14 +188,13 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile =>
Iterator[InternalRow]],
storageConf, fileIndexProps, requiredSchema)
- case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, filters, storageConf)
+ case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, internalSchemaOpt, filters, storageConf)
}
}
}
private def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit =
{
if (internalSchemaOpt.isPresent) {
- conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(internalSchemaOpt.get()))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
tableState.tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
index ddfa4ba6837..56738595c26 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
@@ -19,8 +19,12 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.storage.StorageConfiguration
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.internal.SQLConf
@@ -43,41 +47,44 @@ abstract class
SparkParquetReaderBase(enableVectorizedReader: Boolean,
/**
* Read an individual parquet file
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param storageConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
final def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow] = {
val conf = storageConf.unwrapCopy()
conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
ParquetWriteSupport.setSchema(requiredSchema, conf)
- doRead(file, requiredSchema, partitionSchema, filters, conf)
+ doRead(file, requiredSchema, partitionSchema, internalSchemaOpt, filters,
conf)
}
/**
* Implemented for each spark version
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow]
-
}
trait SparkParquetReaderBuilder {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
index be409e50534..e3c1f3cdefc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.util
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -50,7 +51,7 @@ class TestSparkParquetReaderFormat extends ParquetFileFormat
with SparkAdapterSu
(file: PartitionedFile) => {
//code inside the lambda will run on the executor
- reader.read(file, requiredSchema, partitionSchema, filters,
broadcastedStorageConf.value)
+ reader.read(file, requiredSchema, partitionSchema, util.Option.empty(),
filters, broadcastedStorageConf.value)
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index 4b6196761ad..59dc89c5a21 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.read
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.util
import org.apache.hudi.common.testutils.HoodieTestTable
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -112,7 +113,7 @@ class TestSpark35RecordPositionMetadataColumn extends
SparkClientFunctionalTestH
0,
allBaseFiles.get(0).getLength)
val iterator = new CloseableInternalRowIterator(reader.read(fileInfo,
requiredSchema,
- StructType(Seq.empty), Seq.empty, new
HadoopStorageConfiguration(hadoopConf)))
+ StructType(Seq.empty), util.Option.empty(), Seq.empty, new
HadoopStorageConfiguration(hadoopConf)))
var rowIndices: Set[Long] = Set()
while (iterator.hasNext) {
val row = iterator.next()
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
index 42808f337b7..a0ef97a2266 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
@@ -18,6 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.FileSplit
@@ -81,6 +84,7 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
index d2b2bfad3cf..4996a668af5 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
@@ -25,11 +25,10 @@ import
org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util
import org.apache.hudi.common.util.InternalSchemaCache
-import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
@@ -45,13 +44,12 @@ import
scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
filePath: Path,
requiredSchema: StructType,
- partitionSchema: StructType)
extends SparkAdapterSupport{
+ partitionSchema: StructType,
+ internalSchemaOpt:
util.Option[InternalSchema]) extends SparkAdapterSupport{
// Fetch internal schema
- private lazy val internalSchemaStr: String =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+ private lazy val querySchemaOption: util.Option[InternalSchema] =
pruneInternalSchema(internalSchemaOpt, requiredSchema)
- private lazy val querySchemaOption: util.Option[InternalSchema] =
pruneInternalSchema(internalSchemaStr, requiredSchema)
-
- var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
+ var shouldUseInternalSchema: Boolean = querySchemaOption.isPresent
private lazy val schemaUtils: HoodieSchemaUtils = sparkAdapter.getSchemaUtils
@@ -183,16 +181,11 @@ abstract class
Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
}
object Spark3ParquetSchemaEvolutionUtils {
- def pruneInternalSchema(internalSchemaStr: String, requiredSchema:
StructType): util.Option[InternalSchema] = {
- if (!isNullOrEmpty(internalSchemaStr) ) {
- val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
- if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
-
util.Option.of(SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
querySchemaOption.get()))
- } else {
- querySchemaOption
- }
+ def pruneInternalSchema(internalSchemaOpt: util.Option[InternalSchema],
requiredSchema: StructType): util.Option[InternalSchema] = {
+ if (internalSchemaOpt.isPresent && requiredSchema.nonEmpty) {
+
util.Option.of(SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
internalSchemaOpt.get()))
} else {
- util.Option.empty()
+ internalSchemaOpt
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
index a6e9a3c5fe0..22042c964bc 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
@@ -70,16 +73,18 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.0.3 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -94,7 +99,8 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean,
Array.empty,
null)
- val schemaEvolutionUtils = new
Spark30ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark30ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
index 28acb6e0474..2fccf6a7a72 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
@@ -29,8 +32,9 @@ import java.time.ZoneId
class Spark30ParquetSchemaEvolutionUtils(sharedConf: Configuration,
filePath: Path,
requiredSchema: StructType,
- partitionSchema: StructType) extends
- Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema) {
+ partitionSchema: StructType,
+ internalSchemaOpt:
util.Option[InternalSchema]) extends
+ Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema, internalSchemaOpt) {
def buildVectorizedReader(convertTz: ZoneId,
datetimeRebaseMode: String,
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
index a3e179fa184..bd86f57eefb 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
@@ -71,16 +74,18 @@ class Spark31ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.1.3 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
- requiredSchema: StructType,
+ requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -95,7 +100,8 @@ class Spark31ParquetReader(enableVectorizedReader: Boolean,
Array.empty,
null)
- val schemaEvolutionUtils = new
Spark31ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark31ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
index d31efde4d29..a81d57924d7 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
@@ -29,8 +32,9 @@ import java.time.ZoneId
class Spark31ParquetSchemaEvolutionUtils(sharedConf: Configuration,
filePath: Path,
requiredSchema: StructType,
- partitionSchema: StructType) extends
- Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema) {
+ partitionSchema: StructType,
+ internalSchemaOpt:
util.Option[InternalSchema]) extends
+ Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema, internalSchemaOpt) {
def buildVectorizedReader(convertTz: ZoneId,
datetimeRebaseMode: String,
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
index e2189e5cbdf..6b723702244 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
@@ -74,16 +77,18 @@ class Spark32ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.2.4 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -91,7 +96,8 @@ class Spark32ParquetReader(enableVectorizedReader: Boolean,
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
index 05523bcd9f0..946c4a99d7d 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
@@ -18,10 +18,11 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.util.RebaseDateTime
import
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
import org.apache.spark.sql.types.StructType
@@ -30,8 +31,9 @@ import java.time.ZoneId
class Spark32PlusParquetSchemaEvolutionUtils(sharedConf: Configuration,
filePath: Path,
requiredSchema: StructType,
- partitionSchema: StructType)
extends
- Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema) {
+ partitionSchema: StructType,
+ internalSchemaOpt:
util.Option[InternalSchema]) extends
+ Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema, internalSchemaOpt) {
def buildVectorizedReader(convertTz: ZoneId,
datetimeRebaseMode: String,
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
index 8d882d7945d..8a7961fc978 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -19,6 +19,8 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -74,16 +76,18 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.3.4 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -91,7 +95,8 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean,
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
index 77e90b82849..dadcb61173a 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
@@ -70,16 +73,18 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.4.2 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
protected def doRead(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
filters: Seq[Filter],
sharedConf: Configuration): Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -87,7 +92,8 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean,
val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
index 39c2906c0e5..476ade4fb89 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
@@ -19,6 +19,9 @@
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -70,24 +73,27 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean,
* Read an individual parquet file
* Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.5.1 adapted here
*
- * @param file parquet file to read
- * @param requiredSchema desired output schema of the data
- * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
- * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
- * @param sharedConf the hadoop conf
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
- protected def doRead(file: PartitionedFile,
- requiredSchema: StructType,
- partitionSchema: StructType,
- filters: Seq[Filter],
- sharedConf: Configuration): Iterator[InternalRow] = {
+ override protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ internalSchemaOpt:
org.apache.hudi.common.util.Option[InternalSchema],
+ filters: scala.Seq[Filter],
+ sharedConf: Configuration):
Iterator[InternalRow] = {
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema)
+ val schemaEvolutionUtils = new
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+ partitionSchema, internalSchemaOpt)
val fileFooter = if (enableVectorizedReader) {
// When there are vectorized reads, we can avoid reading the footer
twice by reading