This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d7a7285b29 Rebased MOR iterators onto a `CachingIterator` (to be
idempotent) (#7334)
d7a7285b29 is described below
commit d7a7285b2994453708c060bd7fb5138710ae65bf
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Nov 30 13:31:33 2022 -0800
Rebased MOR iterators onto a `CachingIterator` (to be idempotent) (#7334)
Addressing an invalid semantic of MOR iterators not being actually
idempotent: ie, calling `hasNext` multiple times was actually leading to
iterator advancing, therefore potentially skipping the elements for ex in cases
like:
```
// [[isEmpty]] will invoke [[hasNext]] to check whether Iterator has any
elements
if (iter.isEmpty) {
// ...
} else {
// Here [[map]] will invoke [[hasNext]] again, therefore skipping the
elements
iter.map { /* ... */ }
}
```
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 2 +-
.../{LogFileIterator.scala => Iterators.scala} | 29 ++++++--------
.../org/apache/hudi/util/CachingIterator.scala | 44 ++++++++++++++++++++++
3 files changed, 56 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 404d8d9309..bd7d3647b2 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -93,7 +93,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport {
// NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
// Additionally, we have to explicitly wrap around resulting [[RDD]]
into the one
- // injecting [[SQLConf]], which by default isn't propgated by Spark
to the executor(s).
+ // injecting [[SQLConf]], which by default isn't propagated by Spark
to the executor(s).
// [[SQLConf]] is required by [[AvroSerializer]]
injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
if (rows.isEmpty) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
similarity index 95%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 07a0ce7f23..68b25fafe0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -34,20 +34,17 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata}
import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder,
IndexedRecord}
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
-
+import org.apache.hudi.util.CachingIterator
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import java.io.Closeable
import java.util.Properties
-
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.Try
@@ -61,7 +58,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
config: Configuration)
- extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+ extends CachingIterator[InternalRow] with Closeable with
AvroDeserializerSupport {
protected val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -78,8 +75,6 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- protected var recordToLoad: InternalRow = _
-
private val requiredSchemaSafeAvroProjection =
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
// TODO: now logScanner with internalSchema support column project, we may
no need projectAvroUnsafe
@@ -107,7 +102,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
logRecords.remove(key)
- override def hasNext: Boolean = hasNextInternal
+ protected def doHasNext: Boolean = hasNextInternal
// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
// that recursion is unfolded into a loop to avoid stack overflows
while
@@ -120,14 +115,12 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
this.hasNextInternal
} else {
val projectedAvroRecord =
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
- recordToLoad = deserialize(projectedAvroRecord)
+ nextRecord = deserialize(projectedAvroRecord)
true
}
}
}
- override final def next(): InternalRow = recordToLoad
-
override def close(): Unit =
if (logScanner != null) {
try {
@@ -155,13 +148,13 @@ private class SkipMergeIterator(split:
HoodieMergeOnReadFileSplit,
private val baseFileIterator = baseFileReader(split.dataFile.get)
- override def hasNext: Boolean = {
+ override def doHasNext: Boolean = {
if (baseFileIterator.hasNext) {
// No merge is required, simply load current row and project into
required schema
- recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
+ nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next())
true
} else {
- super[LogFileIterator].hasNext
+ super[LogFileIterator].doHasNext
}
}
}
@@ -196,7 +189,7 @@ class RecordMergingFileIterator(split:
HoodieMergeOnReadFileSplit,
private val baseFileIterator = baseFileReader(split.dataFile.get)
- override def hasNext: Boolean = hasNextInternal
+ override def doHasNext: Boolean = hasNextInternal
// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
// that recursion is unfolded into a loop to avoid stack overflows
while
@@ -208,7 +201,7 @@ class RecordMergingFileIterator(split:
HoodieMergeOnReadFileSplit,
val updatedRecordOpt = removeLogRecord(curKey)
if (updatedRecordOpt.isEmpty) {
// No merge is required, simply load current row and project into
required schema
- recordToLoad = requiredSchemaUnsafeProjection(curRow)
+ nextRecord = requiredSchemaUnsafeProjection(curRow)
true
} else {
val mergedAvroRecordOpt = merge(serialize(curRow),
updatedRecordOpt.get)
@@ -218,12 +211,12 @@ class RecordMergingFileIterator(split:
HoodieMergeOnReadFileSplit,
} else {
val projectedAvroRecord =
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
avroSchema, reusableRecordBuilder)
- recordToLoad = deserialize(projectedAvroRecord)
+ nextRecord = deserialize(projectedAvroRecord)
true
}
}
} else {
- super[LogFileIterator].hasNext
+ super[LogFileIterator].doHasNext
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
new file mode 100644
index 0000000000..8c8fb1023a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util
+
+/**
+ * Extension of the [[Iterator]] allowing for caching of the underlying record
produced
+ * during iteration to provide for the idempotency of the [[hasNext]]
invocation:
+ * meaning, that invoking [[hasNext]] multiple times consequently (w/o
invoking [[next]]
+ * in between) will only make iterator step over a single element
+ *
+ * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring
iteration
+ * semantic to be implemented t/h overriding of a single [[doHasNext]]
method
+ */
+trait CachingIterator[T >: Null] extends Iterator[T] {
+
+ protected var nextRecord: T = _
+
+ protected def doHasNext: Boolean
+
+ override final def hasNext: Boolean = nextRecord != null || doHasNext
+
+ override final def next: T = {
+ val record = nextRecord
+ nextRecord = null
+ record
+ }
+
+}