This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit eb8566ed654b483fdefa54dfe0a1f79cf4f4204f
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Nov 30 13:31:33 2022 -0800

    Rebased MOR iterators onto a `CachingIterator` (to be idempotent) (#7334)
    
    Addressing an invalid semantic of MOR iterators not being actually 
idempotent: ie, calling `hasNext` multiple times was actually leading to 
iterator advancing, therefore potentially skipping the elements for ex in cases 
like:
    
    ```
    // [[isEmpty]] will invoke [[hasNext]] to check whether Iterator has any 
elements
    if (iter.isEmpty) {
      // ...
    } else {
      // Here [[map]] will invoke [[hasNext]] again, therefore skipping the 
elements
      iter.map { /* ... */ }
    }
    ```
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   2 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala | 317 +++++++++++++++++++++
 .../org/apache/hudi/util/CachingIterator.scala     |  44 +++
 3 files changed, 362 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index df5e2777cbe..d02e6e6155a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -96,7 +96,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport {
 
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
     //       Additionally, we have to explicitly wrap around resulting [[RDD]] 
into the one
-    //       injecting [[SQLConf]], which by default isn't propgated by Spark 
to the executor(s).
+    //       injecting [[SQLConf]], which by default isn't propagated by Spark 
to the executor(s).
     //       [[SQLConf]] is required by [[AvroSerializer]]
     injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
       if (rows.isEmpty) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
new file mode 100644
index 00000000000..8928ba0555f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
generateUnsafeProjection}
+import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.config.HoodiePayloadConfig
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.hudi.LogFileIterator._
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
+import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection
+import org.apache.hudi.util.CachingIterator
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+import java.io.Closeable
+import java.util.Properties
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                      tableSchema: HoodieTableSchema,
+                      requiredSchema: HoodieTableSchema,
+                      tableState: HoodieTableState,
+                      config: Configuration)
+  extends CachingIterator[InternalRow] with Closeable with 
AvroDeserializerSupport {
+
+  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
+
+  protected val payloadProps = tableState.preCombineFieldOpt
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  protected override val avroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+  protected override val structTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+  protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+  private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+
+  // TODO: now logScanner with internalSchema support column project, we may 
no need projectAvroUnsafe
+  private var logScanner = {
+    val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+    scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, 
tableState,
+      maxCompactionMemoryInBytes, config, internalSchema)
+  }
+
+  private val logRecords = logScanner.getRecords.asScala
+
+  def logRecordsIterator(): Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = {
+    logRecords.iterator.asInstanceOf[Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])]]
+  }
+
+  // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+  //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+  protected lazy val genericRecordsIterator: Iterator[Option[GenericRecord]] =
+  logRecords.iterator.map {
+    case (_, record) =>
+      toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+        .map(_.asInstanceOf[GenericRecord])
+  }
+
+  protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+    logRecords.remove(key)
+
+  protected def doHasNext: Boolean = hasNextInternal
+
+  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+  //       that recursion is unfolded into a loop to avoid stack overflows 
while
+  //       handling records
+  @tailrec private def hasNextInternal: Boolean = {
+    genericRecordsIterator.hasNext && {
+      val avroRecordOpt = genericRecordsIterator.next()
+      if (avroRecordOpt.isEmpty) {
+        // Record has been deleted, skipping
+        this.hasNextInternal
+      } else {
+        val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
+        nextRecord = deserialize(projectedAvroRecord)
+        true
+      }
+    }
+  }
+
+  override def close(): Unit =
+    if (logScanner != null) {
+      try {
+        logScanner.close()
+      } finally {
+        logScanner = null
+      }
+    }
+}
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
+ */
+private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileReader: BaseFileReader,
+                                dataSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                tableState: HoodieTableState,
+                                config: Configuration)
+  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
+
+  private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
+
+  private val baseFileIterator = baseFileReader(split.dataFile.get)
+
+  override def doHasNext: Boolean = {
+    if (baseFileIterator.hasNext) {
+      // No merge is required, simply load current row and project into 
required schema
+      nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next())
+      true
+    } else {
+      super[LogFileIterator].doHasNext
+    }
+  }
+}
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+ * streams
+ */
+class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileReader: BaseFileReader,
+                                dataSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                tableState: HoodieTableState,
+                                config: Configuration)
+  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
+
+  // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+  //        - Full table's schema
+  //        - Projected schema
+  //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+  //       to correspondingly set the scheme of the expected output of 
base-file reader
+  private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
+
+  private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
+
+  private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(avroSchema)
+
+  private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
+
+  private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
+
+  private val baseFileIterator = baseFileReader(split.dataFile.get)
+
+  override def doHasNext: Boolean = hasNextInternal
+
+  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+  //       that recursion is unfolded into a loop to avoid stack overflows 
while
+  //       handling records
+  @tailrec private def hasNextInternal: Boolean = {
+    if (baseFileIterator.hasNext) {
+      val curRow = baseFileIterator.next()
+      val curKey = curRow.getString(recordKeyOrdinal)
+      val updatedRecordOpt = removeLogRecord(curKey)
+      if (updatedRecordOpt.isEmpty) {
+        // No merge is required, simply load current row and project into 
required schema
+        nextRecord = requiredSchemaUnsafeProjection(curRow)
+        true
+      } else {
+        val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
+        if (mergedAvroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNextInternal
+        } else {
+          val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
+            avroSchema, reusableRecordBuilder)
+          nextRecord = deserialize(projectedAvroRecord)
+          true
+        }
+      }
+    } else {
+      super[LogFileIterator].doHasNext
+    }
+  }
+
+  private def serialize(curRowRecord: InternalRow): GenericRecord =
+    serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
+
+  private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: 
HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+    // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
+    //       on the record from the Delta Log
+    toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
+  }
+}
+
+object LogFileIterator {
+
+  val CONFIG_INSTANTIATION_LOCK = new Object()
+
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration,
+              internalSchema: InternalSchema = 
InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
+    val tablePath = tableState.tablePath
+    val fs = FSUtils.getFs(tablePath, hadoopConf)
+
+    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
+      val metadataConfig = HoodieMetadataConfig.newBuilder()
+        
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
+      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(hadoopConf), metadataConfig,
+        dataTableBasePath,
+        hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
+      // We have to force full-scan for the MT log record reader, to make sure
+      // we can iterate over all of the partitions, since by default some of 
the partitions (Column Stats,
+      // Bloom Filter) are in "point-lookup" mode
+      val forceFullScan = true
+
+      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
+      //       of indirection among MT partitions)
+      val relativePartitionPath = getRelativePartitionPath(new 
Path(tablePath), partitionPath)
+      metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)))
+        .getLeft
+    } else {
+      val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(tablePath)
+        .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
+        .withReaderSchema(logSchema)
+        .withLatestInstantTime(tableState.latestCommitTimestamp)
+        .withReadBlocksLazily(
+          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
+            .getOrElse(false))
+        .withReverseReader(false)
+        .withInternalSchema(internalSchema)
+        .withBufferSize(
+          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
+        .withSpillableMapBasePath(
+          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .withDiskMapType(
+          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+        .withBitCaskDiskMapCompressionEnabled(
+          
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+            
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+
+      if (logFiles.nonEmpty) {
+        logRecordScannerBuilder.withPartition(
+          getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
+      }
+
+      logRecordScannerBuilder.build()
+    }
+  }
+
+  def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, 
reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
+    val fields = projectedSchema.getFields.asScala
+    fields.foreach(field => reusableRecordBuilder.set(field, 
record.get(field.name())))
+    reusableRecordBuilder.build()
+  }
+
+  def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
+    // Determine partition path as an immediate parent folder of either
+    //    - The base file
+    //    - Some log file
+    split.dataFile.map(baseFile => new Path(baseFile.filePath))
+      .getOrElse(split.logFiles.head.getPath)
+      .getParent
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
new file mode 100644
index 00000000000..8c8fb1023a7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util
+
+/**
+ * Extension of the [[Iterator]] allowing for caching of the underlying record 
produced
+ * during iteration to provide for the idempotency of the [[hasNext]] 
invocation:
+ * meaning, that invoking [[hasNext]] multiple times consequently (w/o 
invoking [[next]]
+ * in between) will only make iterator step over a single element
+ *
+ * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring 
iteration
+ *       semantic to be implemented t/h overriding of a single [[doHasNext]] 
method
+ */
+trait CachingIterator[T >: Null] extends Iterator[T] {
+
+  protected var nextRecord: T = _
+
+  protected def doHasNext: Boolean
+
+  override final def hasNext: Boolean = nextRecord != null || doHasNext
+
+  override final def next: T = {
+    val record = nextRecord
+    nextRecord = null
+    record
+  }
+
+}

Reply via email to