This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a7285b29 Rebased MOR iterators onto a `CachingIterator` (to be 
idempotent) (#7334)
d7a7285b29 is described below

commit d7a7285b2994453708c060bd7fb5138710ae65bf
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Nov 30 13:31:33 2022 -0800

    Rebased MOR iterators onto a `CachingIterator` (to be idempotent) (#7334)
    
    Addressing an invalid semantic of MOR iterators not being actually 
idempotent: ie, calling `hasNext` multiple times was actually leading to 
iterator advancing, therefore potentially skipping the elements for ex in cases 
like:
    
    ```
    // [[isEmpty]] will invoke [[hasNext]] to check whether Iterator has any 
elements
    if (iter.isEmpty) {
      // ...
    } else {
      // Here [[map]] will invoke [[hasNext]] again, therefore skipping the 
elements
      iter.map { /* ... */ }
    }
    ```
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  2 +-
 .../{LogFileIterator.scala => Iterators.scala}     | 29 ++++++--------
 .../org/apache/hudi/util/CachingIterator.scala     | 44 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 404d8d9309..bd7d3647b2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -93,7 +93,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
 
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
     //       Additionally, we have to explicitly wrap around resulting [[RDD]] 
into the one
-    //       injecting [[SQLConf]], which by default isn't propgated by Spark 
to the executor(s).
+    //       injecting [[SQLConf]], which by default isn't propagated by Spark 
to the executor(s).
     //       [[SQLConf]] is required by [[AvroSerializer]]
     injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
       if (rows.isEmpty) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
similarity index 95%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 07a0ce7f23..68b25fafe0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -34,20 +34,17 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-
+import org.apache.hudi.util.CachingIterator
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
 
 import java.io.Closeable
 import java.util.Properties
-
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -61,7 +58,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
                       requiredSchema: HoodieTableSchema,
                       tableState: HoodieTableState,
                       config: Configuration)
-  extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+  extends CachingIterator[InternalRow] with Closeable with 
AvroDeserializerSupport {
 
   protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
 
@@ -78,8 +75,6 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
 
   protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
 
-  protected var recordToLoad: InternalRow = _
-
   private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
 
   // TODO: now logScanner with internalSchema support column project, we may 
no need projectAvroUnsafe
@@ -107,7 +102,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
     logRecords.remove(key)
 
-  override def hasNext: Boolean = hasNextInternal
+  protected def doHasNext: Boolean = hasNextInternal
 
   // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
   //       that recursion is unfolded into a loop to avoid stack overflows 
while
@@ -120,14 +115,12 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
         this.hasNextInternal
       } else {
         val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-        recordToLoad = deserialize(projectedAvroRecord)
+        nextRecord = deserialize(projectedAvroRecord)
         true
       }
     }
   }
 
-  override final def next(): InternalRow = recordToLoad
-
   override def close(): Unit =
     if (logScanner != null) {
       try {
@@ -155,13 +148,13 @@ private class SkipMergeIterator(split: 
HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
-  override def hasNext: Boolean = {
+  override def doHasNext: Boolean = {
     if (baseFileIterator.hasNext) {
       // No merge is required, simply load current row and project into 
required schema
-      recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
+      nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next())
       true
     } else {
-      super[LogFileIterator].hasNext
+      super[LogFileIterator].doHasNext
     }
   }
 }
@@ -196,7 +189,7 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
-  override def hasNext: Boolean = hasNextInternal
+  override def doHasNext: Boolean = hasNextInternal
 
   // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
   //       that recursion is unfolded into a loop to avoid stack overflows 
while
@@ -208,7 +201,7 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
       val updatedRecordOpt = removeLogRecord(curKey)
       if (updatedRecordOpt.isEmpty) {
         // No merge is required, simply load current row and project into 
required schema
-        recordToLoad = requiredSchemaUnsafeProjection(curRow)
+        nextRecord = requiredSchemaUnsafeProjection(curRow)
         true
       } else {
         val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
@@ -218,12 +211,12 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
         } else {
           val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
             avroSchema, reusableRecordBuilder)
-          recordToLoad = deserialize(projectedAvroRecord)
+          nextRecord = deserialize(projectedAvroRecord)
           true
         }
       }
     } else {
-      super[LogFileIterator].hasNext
+      super[LogFileIterator].doHasNext
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
new file mode 100644
index 0000000000..8c8fb1023a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util
+
+/**
+ * Extension of the [[Iterator]] allowing for caching of the underlying record 
produced
+ * during iteration to provide for the idempotency of the [[hasNext]] 
invocation:
+ * meaning, that invoking [[hasNext]] multiple times consequently (w/o 
invoking [[next]]
+ * in between) will only make iterator step over a single element
+ *
+ * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring 
iteration
+ *       semantic to be implemented t/h overriding of a single [[doHasNext]] 
method
+ */
+trait CachingIterator[T >: Null] extends Iterator[T] {
+
+  protected var nextRecord: T = _
+
+  protected def doHasNext: Boolean
+
+  override final def hasNext: Boolean = nextRecord != null || doHasNext
+
+  override final def next: T = {
+    val record = nextRecord
+    nextRecord = null
+    record
+  }
+
+}

Reply via email to