yihua commented on code in PR #14003:
URL: https://github.com/apache/hudi/pull/14003#discussion_r2389270821
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala:
##########
@@ -286,16 +293,57 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}
}
- private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant]):
DataFrame = {
+ private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant],
+ broadcastTimeMap:
org.apache.spark.broadcast.Broadcast[Map[String, String]]): DataFrame = {
val commitTimesToFilter = commitsToFilter.map(_.requestedTime)
val hudiDF = sqlContext.read
.format("hudi_v1")
- .schema(usedSchema)
+ .schema(schema)
.load(basePath.toString)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToFilter:
_*))
- // schema enforcement does not happen in above spark.read with hudi. hence
selecting explicitly w/ right column order
- val fieldNames = usedSchema.fieldNames
- hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val fieldNames = schema.fieldNames
+ val selectedDf = hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val transformedRDD = addCompletionTimeColumn(selectedDf.rdd,
broadcastTimeMap)
+ sqlContext.createDataFrame(transformedRDD, schema)
Review Comment:
To add the completion time column support in different query types, it would
be good to push such logic of adding a `_hoodie_commit_completion_time` column
in the result `df` or records to the common file (group) reading layer:
1. When `_hoodie_commit_completion_time` appears in the requested column
(e.g., `select _hoodie_commit_completion_time`) filter or predicate,
automatically add `_hoodie_commit_time` and `_hoodie_commit_completion_time`
for filtering. `_hoodie_commit_completion_time` is transformed from
`_hoodie_commit_time` using completion time query view. This logic should live
in `FileGroupReaderSchemaHandler` which handles the schema for the file group
reader, or the place after base reader returns the records (which does not
leverage file group reader)
2. The file group reader and the base file reader should add the
`_hoodie_commit_completion_time` column before returning the records
3. For different relations, need to check how the file group reader and the
base file reader are used so the correct filter and requested schema are passed
to them for the correct behavior.
4.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala:
##########
@@ -247,27 +251,30 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}
if (doFullTableScan) {
- fullTableScanDataFrame(commitsToReturn)
+ fullTableScanDataFrame(commitsToReturn, broadcastTimeMap)
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
- df = sqlContext.sparkSession.read
+ val bootstrapDF = sqlContext.sparkSession.read
.format("hudi_v1")
- .schema(usedSchema)
+ .schema(schema)
.option(DataSourceReadOptions.READ_PATHS.key,
filteredMetaBootstrapFullPaths.mkString(","))
// Setting time to the END_INSTANT_TIME, to avoid pathFilter
filter out files incorrectly.
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(),
endInstantTime)
.load()
+ df = transformDataFrameWithCompletionTime(bootstrapDF,
broadcastTimeMap)
}
if (regularFileIdToFullPath.nonEmpty) {
try {
val commitTimesToReturn = commitsToReturn.map(_.requestedTime)
- df = df.union(sqlContext.read.options(sOpts)
- .schema(usedSchema).format(formatClassName)
+ val baseDf = sqlContext.read.options(sOpts)
+ .schema(schema).format(formatClassName)
// Setting time to the END_INSTANT_TIME, to avoid pathFilter
filter out files incorrectly.
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
.load(filteredRegularFullPaths.toList: _*)
-
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn:
_*)))
+
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn:
_*))
+ val df_with_completion_time =
transformDataFrameWithCompletionTime(baseDf, broadcastTimeMap)
Review Comment:
Is it better to add a column of completion time with a transformation on
L287 by applying the transformation on the final `df`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala:
##########
@@ -286,16 +293,57 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}
}
- private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant]):
DataFrame = {
+ private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant],
+ broadcastTimeMap:
org.apache.spark.broadcast.Broadcast[Map[String, String]]): DataFrame = {
val commitTimesToFilter = commitsToFilter.map(_.requestedTime)
val hudiDF = sqlContext.read
.format("hudi_v1")
- .schema(usedSchema)
+ .schema(schema)
.load(basePath.toString)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToFilter:
_*))
- // schema enforcement does not happen in above spark.read with hudi. hence
selecting explicitly w/ right column order
- val fieldNames = usedSchema.fieldNames
- hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val fieldNames = schema.fieldNames
+ val selectedDf = hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val transformedRDD = addCompletionTimeColumn(selectedDf.rdd,
broadcastTimeMap)
+ sqlContext.createDataFrame(transformedRDD, schema)
+ }
+
+ private def buildCompletionTimeMapping(): Map[String, String] = {
+ commitsToReturn.map { instant =>
+ val requestedTime = instant.requestedTime()
+ val completionTime =
Option(instant.getCompletionTime).getOrElse(requestedTime)
Review Comment:
Return `null` if not present to avoid confusion.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala:
##########
@@ -286,16 +293,57 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}
}
- private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant]):
DataFrame = {
+ private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant],
+ broadcastTimeMap:
org.apache.spark.broadcast.Broadcast[Map[String, String]]): DataFrame = {
val commitTimesToFilter = commitsToFilter.map(_.requestedTime)
val hudiDF = sqlContext.read
.format("hudi_v1")
- .schema(usedSchema)
+ .schema(schema)
.load(basePath.toString)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToFilter:
_*))
- // schema enforcement does not happen in above spark.read with hudi. hence
selecting explicitly w/ right column order
- val fieldNames = usedSchema.fieldNames
- hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val fieldNames = schema.fieldNames
+ val selectedDf = hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val transformedRDD = addCompletionTimeColumn(selectedDf.rdd,
broadcastTimeMap)
+ sqlContext.createDataFrame(transformedRDD, schema)
Review Comment:
To add the completion time column support in different query types, it would
be good to push such logic of adding a `_hoodie_commit_completion_time` column
in the result `df` or records to the common file (group) reading layer:
1. When `_hoodie_commit_completion_time` appears in the requested column
(e.g., `select _hoodie_commit_completion_time`) filter or predicate,
automatically add `_hoodie_commit_time` and `_hoodie_commit_completion_time`
for filtering. `_hoodie_commit_completion_time` is transformed from
`_hoodie_commit_time` using completion time query view. This logic should live
in `FileGroupReaderSchemaHandler` which handles the schema for the file group
reader, or the place after base reader returns the records (which does not
leverage file group reader)
2. The file group reader and the base file reader should add the
`_hoodie_commit_completion_time` column before returning the records
3. For different relations, need to check how the file group reader and the
base file reader are used so the correct filter and requested schema are passed
to them for the correct behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]