danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372535469


##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -160,9 +169,11 @@ public Map<String, Object> generateMetadataForRecord(
    * @param schema The Avro schema of the record.
    * @return A mapping containing the metadata.
    */
-  public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema) {
+  public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema, boolean isPartial) {
     Map<String, Object> meta = new HashMap<>();
     meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
+    meta.put(INTERNAL_META_SCHEMA, schema);
+    meta.put(INTERNAL_META_IS_PARTIAL, isPartial);

Review Comment:
   I'm wondering whether we can represent the metadata as a POJO to make the 
interface more explicit and clear.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -94,16 +94,17 @@ public Comparable getOrderingValue(Option<InternalRow> 
rowOption,
 
   @Override
   public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow> 
rowOption,
-                                                         Map<String, Object> 
metadataMap,
-                                                         Schema schema) {
+                                                         Map<String, Object> 
metadataMap) {
     if (!rowOption.isPresent()) {
       return new HoodieEmptyRecord<>(
           new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
               (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
+    Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
     InternalRow row = rowOption.get();
+    boolean isPartial = (boolean) 
metadataMap.getOrDefault(INTERNAL_META_IS_PARTIAL, false);
     return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));

Review Comment:
   The `isPartial` is never used.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -51,6 +61,28 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
     val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
-    new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+    if (filePath.toString.contains(HoodieLogFile.DELTA_EXTENSION)) {

Review Comment:
   Use `FsUtils.isLogFile` instead.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -151,7 +151,7 @@ public HoodieFileGroupReader(HoodieReaderContext<T> 
readerContext,
   public void initRecordIterators() {
     this.baseFileIterator = baseFilePath.isPresent()
         ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
+        baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)

Review Comment:
   Unnecessary change?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##########
@@ -127,10 +124,12 @@ public boolean hasNext() throws IOException {
 
       String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
       Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
+      Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+          baseRecord, baseFileSchema, false);

Review Comment:
   Caution for the performace regression for per-record metadata construction.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -70,9 +71,11 @@ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord 
older, Schema oldSc
       }
     }
     if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
-      return Option.of(Pair.of(older, oldSchema));
+      return Option.of(SparkPartialMergingUtils.mergePartialRecords(
+          (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, props));

Review Comment:
   The partial merge may not happen, so maybe give the utility a better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to