This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d498d7b7e2 [HUDI-7047] Fix issues in new file format for incremental
queries (#10009)
0d498d7b7e2 is described below
commit 0d498d7b7e2bbc2dfcad97b22a853936eb5038c3
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Nov 8 05:48:46 2023 -0500
[HUDI-7047] Fix issues in new file format for incremental queries (#10009)
- In HoodieIncrementalFileIndex Seq.empty won't be returned
unless the rest of the method is in an else block.
- Disable vectorized reader for incremental
- testPrunePartitionForTimestampBasedKeyGenerator:
if the date is already in the output format then we don't need to try
and convert it
add partition pruning to the query
- TestIncrementalReadWithFullTableScan: There are schema issues
with impl that were fixed by correctly setting mandatory fields
and projecting to the output schema.
---
.../scala/org/apache/hudi/HoodieCDCFileIndex.scala | 5 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 18 ++++-
.../hudi/HoodieHadoopFsRelationFactory.scala | 16 +++-
.../apache/hudi/HoodieIncrementalFileIndex.scala | 87 +++++++++++-----------
.../hudi/MergeOnReadIncrementalRelation.scala | 14 +++-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 2 +-
.../datasources/HoodieMultipleBaseFileFormat.scala | 2 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 2 +-
.../parquet/NewHoodieParquetFileFormat.scala | 21 ++++--
.../apache/hudi/functional/TestMORDataSource.scala | 4 +-
10 files changed, 102 insertions(+), 69 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index cb46b19b88b..0fe3dcd2b51 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -37,9 +37,10 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
override val schemaSpec: Option[StructType],
override val options: Map[String, String],
@transient override val fileStatusCache:
FileStatusCache = NoopCache,
- override val includeLogFiles: Boolean)
+ override val includeLogFiles: Boolean,
+ override val shouldEmbedFileSlices: Boolean)
extends HoodieIncrementalFileIndex(
- spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices
) with FileIndex {
val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext,
metaClient, options)
val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index aaefd715fa3..2767b6982cd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -157,13 +157,13 @@ case class HoodieFileIndex(spark: SparkSession,
val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
if (slice.getBaseFile.isPresent) {
slice.getBaseFile.get().getFileStatus
- } else if (slice.getLogFiles.findAny().isPresent) {
+ } else if (includeLogFiles &&
slice.getLogFiles.findAny().isPresent) {
slice.getLogFiles.findAny().get().getFileStatus
} else {
null
}
}).filter(slice => slice != null)
- val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+ val c = fileSlices.filter(f => (includeLogFiles &&
f.getLogFiles.findAny().isPresent)
|| (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
if (c.nonEmpty) {
@@ -497,8 +497,18 @@ object HoodieFileIndex extends Logging {
partitionFilters.toArray.map {
_.transformDown {
case Literal(value, dataType) if
dataType.isInstanceOf[StringType] =>
- val converted =
outDateFormat.format(inDateFormat.parse(value.toString))
- Literal(UTF8String.fromString(converted), StringType)
+ try {
+ val converted =
outDateFormat.format(inDateFormat.parse(value.toString))
+ Literal(UTF8String.fromString(converted), StringType)
+ } catch {
+ case _: java.text.ParseException =>
+ try {
+ outDateFormat.parse(value.toString)
+ } catch {
+ case e: Exception => throw new
HoodieException("Partition filter for TimestampKeyGenerator cannot be converted
to format " + outDateFormat.toString, e)
+ }
+ Literal(UTF8String.fromString(value.toString), StringType)
+ }
}
}
} catch {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 7a3ea7483fd..50249d87d97 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -281,8 +281,10 @@ class
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
isBootstrap: Boolean)
extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ override val mandatoryFields: Seq[String] =
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ mandatoryFieldsForMerging
+
override val fileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true)
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true, true)
override val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
@@ -306,6 +308,9 @@ class
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
override val
schemaSpec: Option[StructType],
isBootstrap: Boolean)
extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+
+ override val mandatoryFields: Seq[String] = Seq.empty
+
override val fileIndex: HoodieFileIndex = HoodieFileIndex(
sparkSession,
metaClient,
@@ -335,8 +340,11 @@ class
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
isBootstrap: Boolean)
extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ override val mandatoryFields: Seq[String] =
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
override val fileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false)
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
override val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
@@ -361,7 +369,7 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override
val sqlContext: SQLCo
isBootstrap: Boolean)
extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
override val fileIndex = new HoodieCDCFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true)
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true, true)
}
class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
@@ -371,7 +379,7 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override
val sqlContext: SQLCo
isBootstrap: Boolean)
extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
override val fileIndex = new HoodieCDCFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false)
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index b977c51ab67..982808fdc42 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -36,9 +36,10 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
override val schemaSpec: Option[StructType],
override val options: Map[String, String],
@transient override val fileStatusCache:
FileStatusCache = NoopCache,
- override val includeLogFiles: Boolean)
+ override val includeLogFiles: Boolean,
+ override val shouldEmbedFileSlices: Boolean)
extends HoodieFileIndex(
- spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices
) with FileIndex {
val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation =
MergeOnReadIncrementalRelation(
spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
@@ -47,52 +48,52 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
val fileSlices =
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, dataFilters)
if (fileSlices.isEmpty) {
Seq.empty
- }
-
- val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
- case (partitionValues, fileSlices) =>
- if (shouldEmbedFileSlices) {
- val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
- if (slice.getBaseFile.isPresent) {
- slice.getBaseFile.get().getFileStatus
- } else if (slice.getLogFiles.findAny().isPresent) {
- slice.getLogFiles.findAny().get().getFileStatus
+ } else {
+ val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
+ case (partitionValues, fileSlices) =>
+ if (shouldEmbedFileSlices) {
+ val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
+ if (slice.getBaseFile.isPresent) {
+ slice.getBaseFile.get().getFileStatus
+ } else if (slice.getLogFiles.findAny().isPresent) {
+ slice.getLogFiles.findAny().get().getFileStatus
+ } else {
+ null
+ }
+ }).filter(slice => slice != null)
+ val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+ || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+ foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId
-> f) }
+ if (c.nonEmpty) {
+ PartitionDirectory(new
HoodiePartitionFileSliceMapping(partitionValues, c),
baseFileStatusesAndLogFileOnly)
} else {
- null
+ PartitionDirectory(partitionValues,
baseFileStatusesAndLogFileOnly)
}
- }).filter(slice => slice != null)
- val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
- || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
- foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
- if (c.nonEmpty) {
- PartitionDirectory(new
HoodiePartitionFileSliceMapping(partitionValues, c),
baseFileStatusesAndLogFileOnly)
} else {
- PartitionDirectory(partitionValues, baseFileStatusesAndLogFileOnly)
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+ PartitionDirectory(partitionValues, allCandidateFiles)
}
- } else {
- val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
- val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
- val logFilesStatus = if (includeLogFiles) {
-
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
- } else {
- java.util.stream.Stream.empty()
- }
- val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
- baseFileStatusOpt.foreach(f => files.append(f))
- files
- })
- PartitionDirectory(partitionValues, allCandidateFiles)
- }
- }.toSeq
+ }.toSeq
- hasPushedDownPartitionPredicates = true
- if (shouldReadAsPartitionedTable()) {
- prunedPartitionsAndFilteredFileSlices
- } else if (shouldEmbedFileSlices) {
- assert(partitionSchema.isEmpty)
- prunedPartitionsAndFilteredFileSlices
- } else {
- Seq(PartitionDirectory(InternalRow.empty,
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+ hasPushedDownPartitionPredicates = true
+ if (shouldReadAsPartitionedTable()) {
+ prunedPartitionsAndFilteredFileSlices
+ } else if (shouldEmbedFileSlices) {
+ assert(partitionSchema.isEmpty)
+ prunedPartitionsAndFilteredFileSlices
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 44d937f22ad..e37a1737d27 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -131,9 +131,11 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
val fsView = new HoodieTableFileSystemView(metaClient, timeline,
affectedFilesInCommits)
val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
- modifiedPartitions.asScala.flatMap { relativePartitionPath =>
- fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
- }.toSeq
+
fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
+ .map(p => p.path).filter(p => modifiedPartitions.contains(p))
+ .flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ }
}
filterFileSlices(fileSlices, globPattern)
}
@@ -148,7 +150,11 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
}
def getRequiredFilters: Seq[Filter] = {
- incrementalSpanRecordFilters
+ if (includedCommits.isEmpty) {
+ Seq.empty
+ } else {
+ incrementalSpanRecordFilters
+ }
}
private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern:
String): Seq[FileSlice] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c9a69a5210e..828c8df41de 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -201,7 +201,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
* @param predicates The filter condition.
* @return The pruned partition paths.
*/
- protected def listMatchingPartitionPaths(predicates: Seq[Expression]):
Seq[PartitionPath] = {
+ def listMatchingPartitionPaths(predicates: Seq[Expression]):
Seq[PartitionPath] = {
val resolve = spark.sessionState.analyzer.resolver
val partitionColumnNames = getPartitionColumns
val partitionPruningPredicates = predicates.filter {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index 0e657600e0d..a8a3fa94c13 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -86,7 +86,7 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
if (!supportBatchCalled) {
supportBatchCalled = true
supportBatchResult =
- !isMOR && parquetFormat.supportBatch(sparkSession, schema) &&
orcFormat.supportBatch(sparkSession, schema)
+ !isMOR && !isIncremental && parquetFormat.supportBatch(sparkSession,
schema) && orcFormat.supportBatch(sparkSession, schema)
}
supportBatchResult
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 3fb80509b61..495569b2ce8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -76,7 +76,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState:
HoodieTableState,
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
if (!supportBatchCalled) {
supportBatchCalled = true
- supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+ supportBatchResult = !isMOR && !isIncremental &&
super.supportBatch(sparkSession, schema)
}
supportBatchResult
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index af3cdf715e8..4fe921bc440 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -74,7 +74,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
if (!supportBatchCalled) {
supportBatchCalled = true
- supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+ supportBatchResult = !isIncremental && !isMOR &&
super.supportBatch(sparkSession, schema)
}
supportBatchResult
}
@@ -89,9 +89,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
- val requiredSchemaWithMandatory = if (isIncremental) {
- StructType(dataSchema.toArray ++ partitionSchema.fields)
- } else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+ val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
//add mandatory fields to required schema
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
@@ -142,7 +140,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
val hoodieBaseFile = fileSlice.getBaseFile.get()
val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
val partitionValues = fileSliceMapping.getPartitionValues
- val logFiles = getLogFilesFromSlice(fileSlice)
+ val logFiles = if (isMOR) getLogFilesFromSlice(fileSlice) else
List.empty
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
baseFileReader(baseFile)
@@ -169,9 +167,13 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
//baseFileReader(baseFile)
}
}
+ case _ if isIncremental =>
+ projectSchema(baseFileReader(file),
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields),
outputSchema)
case _ => baseFileReader(file)
}
}
+ case _ if isIncremental =>
+ projectSchema(baseFileReader(file),
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields),
outputSchema)
case _ => baseFileReader(file)
}
}
@@ -190,8 +192,13 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
PartitionedFile => Iterator[InternalRow]) = {
//file reader when you just read a hudi parquet file and don't do any
merging
- val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
+ val baseFileReader = if (isIncremental) {
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchemaWithMandatory,
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
+ } else {
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
+ }
//file reader for reading a hudi base file that needs to be merged with
log files
val preMergeBaseFileReader = if (isMOR) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 4625c9c7bb6..1cfd0e7fbba 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1207,8 +1207,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time)
.option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time)
.load(basePath)
- assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count,
0)
- assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count,
20)
+ assertEquals(0, incrementalQueryRes.where("partition =
'2022-01-01'").count)
+ assertEquals(20, incrementalQueryRes.where("partition =
'2022-01-02'").count)
}
/**