linliu-code commented on code in PR #10167:
URL: https://github.com/apache/hudi/pull/10167#discussion_r1418067760


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -355,12 +364,50 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
 
     (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader)
   }
+}
+
+object HoodieFileGroupReaderBasedParquetFileFormat {
+  private val ROW_INDEX = "row_index"
+  private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
 
-  protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
+  def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: 
String): Seq[Filter] = {
     filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
   }
 
-  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
+  def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = {
     
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
   }
+
+  def getFieldMetadata(name: String, internalName: String): Metadata = {
+    new MetadataBuilder()
+      .putString(METADATA_COL_ATTR_KEY, name)
+      .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+      .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, internalName)
+      .build()
+  }
+
+  def getAppliedRequiredSchema(requiredSchema: StructType,
+                               shouldUseRecordPosition: Boolean,
+                               recordPositionColumn: String): StructType = {
+    if (HoodieSparkUtils.gteqSpark3_5 && shouldUseRecordPosition) {
+      val metadata = getFieldMetadata(recordPositionColumn, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      val rowIndexField = StructField(recordPositionColumn, LongType, nullable 
= false, metadata)
+      StructType(requiredSchema.fields :+ rowIndexField)
+    } else {
+      requiredSchema
+    }
+  }
+
+  def getAppliedFilters(requiredFilters: Seq[Filter],
+                     recordKeyRelatedFilters: Seq[Filter],
+                     shouldUseRecordPosition: Boolean): Seq[Filter] = {
+    if (!HoodieSparkUtils.gteqSpark3_5 && shouldUseRecordPosition) {
+      requiredFilters
+    } else {
+      requiredFilters ++ recordKeyRelatedFilters
+    }
+  }

Review Comment:
   When (spark.version < 3.5 && use_record_position), requiredFilters; 
otherwise, required + record_key. 
   `if ((!HoodieSparkUtils.gteqSpark3_5) && shouldUseRecordPosition)` is more 
clear. ! has higher precedence than &&. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to