prashantwason commented on a change in pull request #3985:
URL: https://github.com/apache/hudi/pull/3985#discussion_r803313357
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -225,6 +236,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
}
}
+ if
(!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
Review comment:
Add a code comment here that "remove the COMMIT_TIME_METADATA_FIELD if
not requested"
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -157,27 +161,40 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
log.info("Additional Filters to be applied to incremental source are :"
+ filters)
- var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+ var prunedSchema = StructType(Seq())
+ if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
+ prunedSchema =
prunedSchema.add(usedSchema(HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+ }
+ requiredColumns.foreach(col => {
+ val field = usedSchema.find(_.name == col)
+ if (field.isDefined) {
+ prunedSchema = prunedSchema.add(field.get)
+ }
+ })
+ var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], prunedSchema)
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
- .schema(usedSchema)
+ .schema(prunedSchema)
.option(DataSourceReadOptions.READ_PATHS.key,
filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (regularFileIdToFullPath.nonEmpty)
{
df = df.union(sqlContext.read.options(sOpts)
- .schema(usedSchema)
+ .schema(prunedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
+ if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
Review comment:
After this change, the COMMIT_TIME_METADATA_FIELD will not be returned
from the results, right? I wonder if anyone is using these and that will fail.
Why is removing this column required?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -168,7 +169,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
log.info("Additional Filters to be applied to incremental source are
:" + filters.mkString("Array(", ", ", ")"))
- var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+ var prunedSchema = StructType(Seq())
+ if
(!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
Review comment:
Add a code comment here to explain why COMMIT_TIME_METADATA_FIELD is
required in prunedSchema even if it is not requested by user and not present in
requiredColumns (because it is required to match the incremental fetch).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]