yihua commented on code in PR #9276:
URL: https://github.com/apache/hudi/pull/9276#discussion_r1283625541


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -247,11 +247,23 @@ object DefaultSource {
       Option(schema)
     }
 
+
+
+

Review Comment:
   nit: remove empty lines



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -262,16 +274,28 @@ object DefaultSource {
           new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, 
globPaths, userSchema)
+          if (newHudiFileFormatUtils.isEmpty || 
newHudiFileFormatUtils.get.hasSchemaOnRead) {

Review Comment:
   Instead of checking `newHudiFileFormatUtils.get.hasSchemaOnRead`, should 
`hoodie.schema.on.read.enable` config be checked when lazy creating 
`newHudiFileFormatUtils` and only check `newHudiFileFormatUtils.isEmpty` here 
for simplicity?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -77,6 +78,9 @@ trait HoodieCatalystPlansUtils {
    */
   def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, 
LogicalPlan, Expression)]
 
+
+  def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan

Review Comment:
   nit: add docs to explain what this projection does, e.g. resolving the 
schema based on the resolver, and why this is needed for the new file format.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -262,16 +274,28 @@ object DefaultSource {
           new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, 
globPaths, userSchema)
+          if (newHudiFileFormatUtils.isEmpty || 
newHudiFileFormatUtils.get.hasSchemaOnRead) {
+            new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
+          } else {
+            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, 
isBootstrap = false)
+          }
 
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new MergeOnReadIncrementalRelation(sqlContext, parameters, 
metaClient, userSchema)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-          new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+          if (newHudiFileFormatUtils.isEmpty || 
newHudiFileFormatUtils.get.hasSchemaOnRead) {

Review Comment:
   Similar here and below.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -87,6 +87,14 @@ object DataSourceReadOptions {
       s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) 
or skip merging altogether" +
       s"${REALTIME_SKIP_MERGE_OPT_VAL}")
 
+  val LEGACY_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty

Review Comment:
   ```suggestion
     val USE_LEGACY_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = 
ConfigProperty
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -247,11 +247,23 @@ object DefaultSource {
       Option(schema)
     }
 
+
+
+
     if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() 
== 0) {
       new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
     } else if (isCdcQuery) {
       CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
     } else {
+      lazy val newHudiFileFormatUtils = if 
(!parameters.getOrElse(LEGACY_HUDI_PARQUET_FILE_FORMAT.key,
+        LEGACY_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths 
== null || globPaths.isEmpty)
+        && parameters.getOrElse(REALTIME_MERGE.key(), 
REALTIME_MERGE.defaultValue())
+        .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {

Review Comment:
   Is there any issue with `REALTIME_SKIP_MERGE_OPT_VAL` merge type?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -144,22 +146,41 @@ case class HoodieFileIndex(spark: SparkSession,
     // Prune the partition path by the partition filters
     // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
     //       encompassing the whole table
-    val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
+    val prunedPartitions = if (shouldBroadcast) {
+      
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters))
+    } else {
+      listMatchingPartitionPaths(partitionFilters)
+    }
     val listedPartitions = getInputFileSlices(prunedPartitions: 
_*).asScala.toSeq.map {
       case (partition, fileSlices) =>
-        val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+        var baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
           .asScala
-          .map(fs => fs.getBaseFile.orElse(null))
-          .filter(_ != null))
-
+            .map(fs => fs.getBaseFile.orElse(null))
+            .filter(_ != null))
+        if (shouldBroadcast) {
+          baseFileStatuses = baseFileStatuses ++ fileSlices.asScala

Review Comment:
   Rename this variable since this contains `FileStatus` instance from file 
slices with only log files?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala:
##########
@@ -58,10 +59,13 @@ case class HoodieBootstrapMORRelation(override val 
sqlContext: SQLContext,
                                       override val optParams: Map[String, 
String],
                                       private val prunedDataSchema: 
Option[StructType] = None)
   extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, 
metaClient,
-    optParams, prunedDataSchema) {
+    optParams, prunedDataSchema) with SparkAdapterSupport {
 
   override type Relation = HoodieBootstrapMORRelation
 
+  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+

Review Comment:
   Are these changes still needed?



##########
hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
##########
@@ -17,4 +17,4 @@
 
 
 org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat

Review Comment:
   When switching to the new file format with the config, should the 
`NewHoodieParquetFileFormat` be registered too?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -144,22 +146,41 @@ case class HoodieFileIndex(spark: SparkSession,
     // Prune the partition path by the partition filters
     // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
     //       encompassing the whole table
-    val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
+    val prunedPartitions = if (shouldBroadcast) {
+      
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters))
+    } else {
+      listMatchingPartitionPaths(partitionFilters)
+    }
     val listedPartitions = getInputFileSlices(prunedPartitions: 
_*).asScala.toSeq.map {
       case (partition, fileSlices) =>
-        val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+        var baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
           .asScala
-          .map(fs => fs.getBaseFile.orElse(null))
-          .filter(_ != null))
-
+            .map(fs => fs.getBaseFile.orElse(null))
+            .filter(_ != null))
+        if (shouldBroadcast) {
+          baseFileStatuses = baseFileStatuses ++ fileSlices.asScala
+            .filter(f => f.getLogFiles.findAny().isPresent && 
!f.getBaseFile.isPresent)
+            .map(f => f.getLogFiles.findAny().get().getFileStatus)
+        }
         // Filter in candidate files based on the col-stats index lookup
         val candidateFiles = baseFileStatuses.filter(fs =>
           // NOTE: This predicate is true when {@code Option} is empty
           candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
 
         totalFileSize += baseFileStatuses.size
         candidateFileSize += candidateFiles.size
-        PartitionDirectory(InternalRow.fromSeq(partition.values), 
candidateFiles)
+        if (shouldBroadcast) {
+          val c = fileSlices.asScala.filter(f => 
f.getLogFiles.findAny().isPresent
+            || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+            foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
+          if (c.nonEmpty) {
+            PartitionDirectory(new 
PartitionFileSliceMapping(InternalRow.fromSeq(partition.values), 
spark.sparkContext.broadcast(c)), candidateFiles)
+          } else {
+            PartitionDirectory(InternalRow.fromSeq(partition.values), 
candidateFiles)
+          }
+        } else {
+          PartitionDirectory(InternalRow.fromSeq(partition.values), 
candidateFiles)
+        }

Review Comment:
   Could you move the logic to a single if branch when broadcast is enabled, so 
it's easier to read?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to