This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 11221fda2b34924fed736d6b2686af03d2999a11
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Nov 30 13:31:33 2022 -0800

    Rebased MOR iterators onto a `CachingIterator` (to be idempotent) (#7334)
    
    Addressing an invalid semantic of MOR iterators not being actually 
idempotent: ie, calling `hasNext` multiple times was actually leading to 
iterator advancing, therefore potentially skipping the elements for ex in cases 
like:
    
    ```
    // [[isEmpty]] will invoke [[hasNext]] to check whether Iterator has any 
elements
    if (iter.isEmpty) {
      // ...
    } else {
      // Here [[map]] will invoke [[hasNext]] again, therefore skipping the 
elements
      iter.map { /* ... */ }
    }
    ```
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   2 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     | 176 +----------
 .../src/main/scala/org/apache/hudi/Iterators.scala | 332 +++++++++++++++++++++
 .../org/apache/hudi/util/CachingIterator.scala     |  44 +++
 4 files changed, 384 insertions(+), 170 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index df5e2777cbe..d02e6e6155a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -96,7 +96,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
 
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
     //       Additionally, we have to explicitly wrap around resulting [[RDD]] 
into the one
-    //       injecting [[SQLConf]], which by default isn't propgated by Spark 
to the executor(s).
+    //       injecting [[SQLConf]], which by default isn't propagated by Spark 
to the executor(s).
     //       [[SQLConf]] is required by [[AvroSerializer]]
     injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
       if (rows.isEmpty) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 512c97806f3..c58ce809c3d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -19,19 +19,18 @@
 package org.apache.hudi
 
 import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
generateUnsafeProjection, projectReader}
-import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
+import org.apache.hudi.HoodieConversionUtils.{toJavaOption}
 import 
org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals
-import org.apache.hudi.HoodieMergeOnReadRDD._
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
-import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, 
HoodieRecordPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieLogFile, 
OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodiePayloadConfig
@@ -121,15 +120,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
         projectedReader(dataFileOnlySplit.dataFile.get)
 
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        new LogFileIterator(logFileOnlySplit, getConfig)
+        new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
         val reader = fileReaders.requiredSchemaReaderSkipMerging
-        new SkipMergeIterator(split, reader, getConfig)
+        new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
         val reader = pickBaseFileReader
-        new RecordMergingFileIterator(split, reader, getConfig)
+        new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, getConfig)
 
       case _ => throw new HoodieException(s"Unable to select an Iterator to 
read the Hoodie MOR File Split for " +
         s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
@@ -173,167 +172,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
       new Configuration(conf)
     }
   }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
-   * Delta Log files (represented as [[InternalRow]]s)
-   */
-  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
-                                config: Configuration)
-    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
-
-    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
-
-    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-
-    protected var recordToLoad: InternalRow = _
-
-    private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
-
-    private var logScanner = {
-      val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
-      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
-        maxCompactionMemoryInBytes, config, internalSchema)
-    }
-
-    private val logRecords = logScanner.getRecords.asScala
-
-    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
-    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
-      logRecords.iterator.map {
-        case (_, record) =>
-          toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
-            .map(_.asInstanceOf[GenericRecord])
-      }
-
-    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
-      logRecords.remove(key)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      logRecordsIterator.hasNext && {
-        val avroRecordOpt = logRecordsIterator.next()
-        if (avroRecordOpt.isEmpty) {
-          // Record has been deleted, skipping
-          this.hasNextInternal
-        } else {
-          val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-          recordToLoad = deserialize(projectedAvroRecord)
-          true
-        }
-      }
-    }
-
-    override final def next(): InternalRow = recordToLoad
-
-    override def close(): Unit =
-      if (logScanner != null) {
-        try {
-          logScanner.close()
-        } finally {
-          logScanner = null
-        }
-      }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
-   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
-   */
-  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
-                                  baseFileReader: BaseFileReader,
-                                  config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = {
-      if (baseFileIterator.hasNext) {
-        // No merge is required, simply load current row and project into 
required schema
-        recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
-        true
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
-   * streams
-   */
-  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
-                                          baseFileReader: BaseFileReader,
-                                          config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
-    //        - Full table's schema
-    //        - Projected schema
-    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
-    //       to correspondingly set the scheme of the expected output of 
base-file reader
-    private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
-
-    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
-
-    private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
-
-    private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      if (baseFileIterator.hasNext) {
-        val curRow = baseFileIterator.next()
-        val curKey = curRow.getString(recordKeyOrdinal)
-        val updatedRecordOpt = removeLogRecord(curKey)
-        if (updatedRecordOpt.isEmpty) {
-          // No merge is required, simply load current row and project into 
required schema
-          recordToLoad = requiredSchemaUnsafeProjection(curRow)
-          true
-        } else {
-          val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
-          if (mergedAvroRecordOpt.isEmpty) {
-            // Record has been deleted, skipping
-            this.hasNextInternal
-          } else {
-            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
-              requiredAvroSchema, reusableRecordBuilder)
-            recordToLoad = deserialize(projectedAvroRecord)
-            true
-          }
-        }
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-
-    private def serialize(curRowRecord: InternalRow): GenericRecord =
-      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
-      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
-      //       on the record from the Delta Log
-      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
-    }
-  }
 }
 
 private object HoodieMergeOnReadRDD {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
new file mode 100644
index 00000000000..bb43459af14
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
generateUnsafeProjection}
+import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
+import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.config.HoodiePayloadConfig
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.hudi.LogFileIterator._
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
+import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.util.CachingIterator
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+import java.io.Closeable
+import java.util.Properties
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+
+trait AvroDeserializerSupport extends SparkAdapterSupport {
+  protected val avroSchema: Schema
+  protected val structTypeSchema: StructType
+
+  private lazy val deserializer: HoodieAvroDeserializer =
+    sparkAdapter.createAvroDeserializer(avroSchema, structTypeSchema)
+
+  protected def deserialize(avroRecord: GenericRecord): InternalRow = {
+    checkState(avroRecord.getSchema.getFields.size() == 
structTypeSchema.fields.length)
+    deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
+  }
+}
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                      tableSchema: HoodieTableSchema,
+                      requiredSchema: HoodieTableSchema,
+                      tableState: HoodieTableState,
+                      config: Configuration)
+  extends CachingIterator[InternalRow] with Closeable with 
AvroDeserializerSupport {
+
+  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
+
+  protected val payloadProps = tableState.preCombineFieldOpt
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  protected override val avroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+  protected override val structTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+  protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+  private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+
+  // TODO: now logScanner with internalSchema support column project, we may 
no need projectAvroUnsafe
+  private var logScanner = {
+    val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+    scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, 
tableState,
+      maxCompactionMemoryInBytes, config, internalSchema)
+  }
+
+  private val logRecords = logScanner.getRecords.asScala
+
+  def logRecordsIterator(): Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = {
+    logRecords.iterator.asInstanceOf[Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])]]
+  }
+
+  // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+  //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+  protected lazy val genericRecordsIterator: Iterator[Option[GenericRecord]] =
+  logRecords.iterator.map {
+    case (_, record) =>
+      toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+        .map(_.asInstanceOf[GenericRecord])
+  }
+
+  protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+    logRecords.remove(key)
+
+  protected def doHasNext: Boolean = hasNextInternal
+
+  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+  //       that recursion is unfolded into a loop to avoid stack overflows 
while
+  //       handling records
+  @tailrec private def hasNextInternal: Boolean = {
+    genericRecordsIterator.hasNext && {
+      val avroRecordOpt = genericRecordsIterator.next()
+      if (avroRecordOpt.isEmpty) {
+        // Record has been deleted, skipping
+        this.hasNextInternal
+      } else {
+        val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
+        nextRecord = deserialize(projectedAvroRecord)
+        true
+      }
+    }
+  }
+
+  override def close(): Unit =
+    if (logScanner != null) {
+      try {
+        logScanner.close()
+      } finally {
+        logScanner = null
+      }
+    }
+}
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
+ */
+private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileReader: BaseFileReader,
+                                dataSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                tableState: HoodieTableState,
+                                config: Configuration)
+  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
+
+  private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
+
+  private val baseFileIterator = baseFileReader(split.dataFile.get)
+
+  override def doHasNext: Boolean = {
+    if (baseFileIterator.hasNext) {
+      // No merge is required, simply load current row and project into 
required schema
+      nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next())
+      true
+    } else {
+      super[LogFileIterator].doHasNext
+    }
+  }
+}
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+ * streams
+ */
+class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileReader: BaseFileReader,
+                                dataSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                tableState: HoodieTableState,
+                                config: Configuration)
+  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
+
+  // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+  //        - Full table's schema
+  //        - Projected schema
+  //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+  //       to correspondingly set the scheme of the expected output of 
base-file reader
+  private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
+
+  private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
+
+  private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(avroSchema)
+
+  private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
+
+  private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
+
+  private val baseFileIterator = baseFileReader(split.dataFile.get)
+
+  override def doHasNext: Boolean = hasNextInternal
+
+  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+  //       that recursion is unfolded into a loop to avoid stack overflows 
while
+  //       handling records
+  @tailrec private def hasNextInternal: Boolean = {
+    if (baseFileIterator.hasNext) {
+      val curRow = baseFileIterator.next()
+      val curKey = curRow.getString(recordKeyOrdinal)
+      val updatedRecordOpt = removeLogRecord(curKey)
+      if (updatedRecordOpt.isEmpty) {
+        // No merge is required, simply load current row and project into 
required schema
+        nextRecord = requiredSchemaUnsafeProjection(curRow)
+        true
+      } else {
+        val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
+        if (mergedAvroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNextInternal
+        } else {
+          val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
+            avroSchema, reusableRecordBuilder)
+          nextRecord = deserialize(projectedAvroRecord)
+          true
+        }
+      }
+    } else {
+      super[LogFileIterator].doHasNext
+    }
+  }
+
+  private def serialize(curRowRecord: InternalRow): GenericRecord =
+    serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
+
+  private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: 
HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+    // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
+    //       on the record from the Delta Log
+    toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
+  }
+}
+
+object LogFileIterator {
+
+  val CONFIG_INSTANTIATION_LOCK = new Object()
+
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration,
+              internalSchema: InternalSchema = 
InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
+    val tablePath = tableState.tablePath
+    val fs = FSUtils.getFs(tablePath, hadoopConf)
+
+    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
+      val metadataConfig = HoodieMetadataConfig.newBuilder()
+        
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
+      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(hadoopConf), metadataConfig,
+        dataTableBasePath,
+        hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
+      // We have to force full-scan for the MT log record reader, to make sure
+      // we can iterate over all of the partitions, since by default some of 
the partitions (Column Stats,
+      // Bloom Filter) are in "point-lookup" mode
+      val forceFullScan = true
+
+      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
+      //       of indirection among MT partitions)
+      val relativePartitionPath = getRelativePartitionPath(new 
Path(tablePath), partitionPath)
+      metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)))
+        .getLeft
+    } else {
+      val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(tablePath)
+        .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
+        .withReaderSchema(logSchema)
+        .withLatestInstantTime(tableState.latestCommitTimestamp)
+        .withReadBlocksLazily(
+          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
+            .getOrElse(false))
+        .withReverseReader(false)
+        .withInternalSchema(internalSchema)
+        .withBufferSize(
+          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
+        .withSpillableMapBasePath(
+          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .withDiskMapType(
+          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+        .withBitCaskDiskMapCompressionEnabled(
+          
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+            
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+
+      if (logFiles.nonEmpty) {
+        logRecordScannerBuilder.withPartition(
+          getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
+      }
+
+      logRecordScannerBuilder.build()
+    }
+  }
+
+  def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, 
reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
+    val fields = projectedSchema.getFields.asScala
+    fields.foreach(field => reusableRecordBuilder.set(field, 
record.get(field.name())))
+    reusableRecordBuilder.build()
+  }
+
+  def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
+    // Determine partition path as an immediate parent folder of either
+    //    - The base file
+    //    - Some log file
+    split.dataFile.map(baseFile => new Path(baseFile.filePath))
+      .getOrElse(split.logFiles.head.getPath)
+      .getParent
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
new file mode 100644
index 00000000000..8c8fb1023a7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util
+
+/**
+ * Extension of the [[Iterator]] allowing for caching of the underlying record 
produced
+ * during iteration to provide for the idempotency of the [[hasNext]] 
invocation:
+ * meaning, that invoking [[hasNext]] multiple times consequently (w/o 
invoking [[next]]
+ * in between) will only make iterator step over a single element
+ *
+ * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring 
iteration
+ *       semantic to be implemented t/h overriding of a single [[doHasNext]] 
method
+ */
+trait CachingIterator[T >: Null] extends Iterator[T] {
+
+  protected var nextRecord: T = _
+
+  protected def doHasNext: Boolean
+
+  override final def hasNext: Boolean = nextRecord != null || doHasNext
+
+  override final def next: T = {
+    val record = nextRecord
+    nextRecord = null
+    record
+  }
+
+}

Reply via email to