yihua commented on code in PR #9276:
URL: https://github.com/apache/hudi/pull/9276#discussion_r1283625541
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -247,11 +247,23 @@ object DefaultSource {
Option(schema)
}
+
+
+
Review Comment:
nit: remove empty lines
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -262,16 +274,28 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient,
globPaths, userSchema)
+ if (newHudiFileFormatUtils.isEmpty ||
newHudiFileFormatUtils.get.hasSchemaOnRead) {
Review Comment:
Instead of checking `newHudiFileFormatUtils.get.hasSchemaOnRead`, should
`hoodie.schema.on.read.enable` config be checked when lazy creating
`newHudiFileFormatUtils` and only check `newHudiFileFormatUtils.isEmpty` here
for simplicity?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -77,6 +78,9 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan,
LogicalPlan, Expression)]
+
+ def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
Review Comment:
nit: add docs to explain what this projection does, e.g. resolving the
schema based on the resolver, and why this is needed for the new file format.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -262,16 +274,28 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient,
globPaths, userSchema)
+ if (newHudiFileFormatUtils.isEmpty ||
newHudiFileFormatUtils.get.hasSchemaOnRead) {
+ new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
+ } else {
+ newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true,
isBootstrap = false)
+ }
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
+ if (newHudiFileFormatUtils.isEmpty ||
newHudiFileFormatUtils.get.hasSchemaOnRead) {
Review Comment:
Similar here and below.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -87,6 +87,14 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL})
or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
+ val LEGACY_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
Review Comment:
```suggestion
val USE_LEGACY_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] =
ConfigProperty
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -247,11 +247,23 @@ object DefaultSource {
Option(schema)
}
+
+
+
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants()
== 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
+ lazy val newHudiFileFormatUtils = if
(!parameters.getOrElse(LEGACY_HUDI_PARQUET_FILE_FORMAT.key,
+ LEGACY_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths
== null || globPaths.isEmpty)
+ && parameters.getOrElse(REALTIME_MERGE.key(),
REALTIME_MERGE.defaultValue())
+ .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
Review Comment:
Is there any issue with `REALTIME_SKIP_MERGE_OPT_VAL` merge type?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -144,22 +146,41 @@ case class HoodieFileIndex(spark: SparkSession,
// Prune the partition path by the partition filters
// NOTE: Non-partitioned tables are assumed to consist from a single
partition
// encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
+ val prunedPartitions = if (shouldBroadcast) {
+
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
+ } else {
+ listMatchingPartitionPaths(partitionFilters)
+ }
val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+ var baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
.asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null))
-
+ .map(fs => fs.getBaseFile.orElse(null))
+ .filter(_ != null))
+ if (shouldBroadcast) {
+ baseFileStatuses = baseFileStatuses ++ fileSlices.asScala
Review Comment:
Rename this variable since this contains `FileStatus` instance from file
slices with only log files?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala:
##########
@@ -58,10 +59,13 @@ case class HoodieBootstrapMORRelation(override val
sqlContext: SQLContext,
override val optParams: Map[String,
String],
private val prunedDataSchema:
Option[StructType] = None)
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths,
metaClient,
- optParams, prunedDataSchema) {
+ optParams, prunedDataSchema) with SparkAdapterSupport {
override type Relation = HoodieBootstrapMORRelation
+ protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+ DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+
Review Comment:
Are these changes still needed?
##########
hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
##########
@@ -17,4 +17,4 @@
org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
Review Comment:
When switching to the new file format with the config, should the
`NewHoodieParquetFileFormat` be registered too?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -144,22 +146,41 @@ case class HoodieFileIndex(spark: SparkSession,
// Prune the partition path by the partition filters
// NOTE: Non-partitioned tables are assumed to consist from a single
partition
// encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
+ val prunedPartitions = if (shouldBroadcast) {
+
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
+ } else {
+ listMatchingPartitionPaths(partitionFilters)
+ }
val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+ var baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
.asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null))
-
+ .map(fs => fs.getBaseFile.orElse(null))
+ .filter(_ != null))
+ if (shouldBroadcast) {
+ baseFileStatuses = baseFileStatuses ++ fileSlices.asScala
+ .filter(f => f.getLogFiles.findAny().isPresent &&
!f.getBaseFile.isPresent)
+ .map(f => f.getLogFiles.findAny().get().getFileStatus)
+ }
// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
- PartitionDirectory(InternalRow.fromSeq(partition.values),
candidateFiles)
+ if (shouldBroadcast) {
+ val c = fileSlices.asScala.filter(f =>
f.getLogFiles.findAny().isPresent
+ || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+ foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ if (c.nonEmpty) {
+ PartitionDirectory(new
PartitionFileSliceMapping(InternalRow.fromSeq(partition.values),
spark.sparkContext.broadcast(c)), candidateFiles)
+ } else {
+ PartitionDirectory(InternalRow.fromSeq(partition.values),
candidateFiles)
+ }
+ } else {
+ PartitionDirectory(InternalRow.fromSeq(partition.values),
candidateFiles)
+ }
Review Comment:
Could you move the logic to a single if branch when broadcast is enabled, so
it's easier to read?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]