beyond1920 commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1312679199


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -266,13 +266,14 @@ class RecordMergingFileIterator(logFiles: 
List[HoodieLogFile],
   private def serialize(curRowRecord: InternalRow): GenericRecord =
     serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-  private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): 
Option[InternalRow] = {
+  private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): 
scala.Option[InternalRow] = {

Review Comment:
   Don't forget to call this `merge` method even if `updatedRecordOpt.isEmpty` 
is `true`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -346,7 +346,8 @@ public void write(HoodieRecord<T> oldRecord) {
       // writing the first record. So make a copy of the record to be merged
       HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
       try {
-        Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
+        Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(

Review Comment:
   We should also call `merge` API for those records which existed in 
`keyToNewRecords` but not existed in base file before flushed them to disk.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -116,16 +116,16 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
 
   // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
   //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-  private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
+  private lazy val logRecordsIterator: Iterator[scala.Option[HoodieRecord[_]]] 
=

Review Comment:
   Why introduce `scala` prefix?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -116,16 +116,16 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
 
   // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
   //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-  private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
+  private lazy val logRecordsIterator: Iterator[scala.Option[HoodieRecord[_]]] 
=
     logRecords.iterator.map {
-      case (_, record: HoodieSparkRecord) => Option(record)
-      case (_, _: HoodieEmptyRecord[_]) => Option.empty
+      case (_, record: HoodieSparkRecord) => scala.Option(record)
+      case (_, _: HoodieEmptyRecord[_]) => scala.Option.empty
       case (_, record) =>
         toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, 
payloadProps))
 
     }
 
-  protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = 
logRecords.remove(key)
+  protected def removeLogRecord(key: String): scala.Option[HoodieRecord[_]] = 
logRecords.remove(key)
 
   protected def doHasNext: Boolean = hasNextInternal
 

Review Comment:
   We should also call merger API here to check whether the record need to be 
dropped before load it as result of query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to