alexeykudinkin commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r834964133



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = 
UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, 
collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records 
read from the Delta Log file
+    //       which always reads records in full schema (never projected, due 
to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required 
schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = 
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: 
Iterator[InternalRow],
-                                         config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = 
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = 
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = 
unsafeProjection(createInternalRowWithSchema(curRow, 
requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: 
Iterator[InternalRow],
+                                          baseFileReaderSchema: 
HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+    //       to correspondingly set the scheme of the expected output of 
base-file reader
+    private val baseFileReaderAvroSchema = new 
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, 
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+

Review comment:
       Frankly, there's not a lot of added value in this Trait, given that its 
API will be practically identical to that one of `SparkAdapterSupport`:
   
   ```
   def createAvroSerializer(st: StructType, schema: Schema)
   ```
   

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = 
UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, 
collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records 
read from the Delta Log file
+    //       which always reads records in full schema (never projected, due 
to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required 
schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = 
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: 
Iterator[InternalRow],
-                                         config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = 
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = 
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = 
unsafeProjection(createInternalRowWithSchema(curRow, 
requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: 
Iterator[InternalRow],
+                                          baseFileReaderSchema: 
HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+    //       to correspondingly set the scheme of the expected output of 
base-file reader
+    private val baseFileReaderAvroSchema = new 
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, 
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+    private val recordKeyOrdinal = 
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRowRecord = baseFileIterator.next()
+        val curKey = curRowRecord.getString(recordKeyOrdinal)
+        val updatedRecordOpt = removeLogRecord(curKey)
+        if (updatedRecordOpt.isEmpty) {
+          // No merge needed, load current row with required projected schema
+          recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, 
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+          true
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curKey = logRecordsKeyIterator.next()
-            if (keyToSkip.contains(curKey)) {
-              this.hasNext
-            } else {
-              val insertAvroRecord = 
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
-              if (!insertAvroRecord.isPresent) {
-                // stand alone delete record, skipping
-                this.hasNext
-              } else {
-                val requiredAvroRecord = AvroConversionUtils
-                  .buildAvroRecordBySchema(
-                    insertAvroRecord.get(),
-                    requiredAvroSchema,
-                    requiredFieldPosition,
-                    recordBuilder
-                  )
-                val rowOpt = 
requiredDeserializer.deserialize(requiredAvroRecord)
-                recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-                true
-              }
-            }
+          val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
+          if (mergedAvroRecordOpt.isEmpty) {
+            // Record has been deleted, skipping
+            this.hasNext
           } else {
-            false
+            // NOTE: In occurrence of a merge we can't know the schema of the 
record being returned, b/c
+            //       record from the Delta Log will bear (full) Table schema, 
while record from the Base file
+            //       might already be read in projected one (as an 
optimization).
+            //       As such we can't use more performant 
[[projectAvroUnsafe]], and instead have to fallback
+            //       to [[projectAvro]]
+            val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, 
requiredAvroSchema, recordBuilder)
+            recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+            true
           }
         }
+      } else {
+        super[LogFileIterator].hasNext
       }
+    }
 
-      override def next(): InternalRow = recordToLoad
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
+    private def serialize(curRowRecord: InternalRow): GenericRecord =
+      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-      private def mergeRowWithLog(curRow: InternalRow, curKey: String) : 
org.apache.hudi.common.util.Option[IndexedRecord] = {
-        val historyAvroRecord = 
serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        val mergedRec = logRecords.get(curKey).getData
-          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, 
payloadProps)
-        if (mergedRec.isPresent && mergedRec.get().getSchema != 
tableAvroSchema) {
-          
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
 tableAvroSchema).asInstanceOf[IndexedRecord])
-        } else {
-          mergedRec
-        }
-      }
+    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
+      //       on the record from the Delta Log
+      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
     }
+  }
 }
 
 private object HoodieMergeOnReadRDD {
+
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
-  def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
-    val fs = FSUtils.getFs(split.tablePath, config)
-    val logFiles = split.logFiles.get
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration): HoodieMergedLogRecordScanner = {

Review comment:
       Good call!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to