YannByron commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r966829945


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -169,310 +141,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
   private def getConfig: Configuration = {
     val conf = confBroadcast.value.value
-    HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
+    CONFIG_INSTANTIATION_LOCK.synchronized {
       new Configuration(conf)
     }
   }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
-   * Delta Log files (represented as [[InternalRow]]s)
-   */
-  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
-                                config: Configuration)
-    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
-
-    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
-
-    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-
-    protected var recordToLoad: InternalRow = _
-
-    private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
-
-    private var logScanner = {
-      val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
-      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
-        maxCompactionMemoryInBytes, config, internalSchema)
-    }
-
-    private val logRecords = logScanner.getRecords.asScala
-
-    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
-    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
-      logRecords.iterator.map {
-        case (_, record) =>
-          toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
-            .map(_.asInstanceOf[GenericRecord])
-      }
-
-    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
-      logRecords.remove(key)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      logRecordsIterator.hasNext && {
-        val avroRecordOpt = logRecordsIterator.next()
-        if (avroRecordOpt.isEmpty) {
-          // Record has been deleted, skipping
-          this.hasNextInternal
-        } else {
-          val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-          recordToLoad = deserialize(projectedAvroRecord)
-          true
-        }
-      }
-    }
-
-    override final def next(): InternalRow = recordToLoad
-
-    override def close(): Unit =
-      if (logScanner != null) {
-        try {
-          logScanner.close()
-        } finally {
-          logScanner = null
-        }
-      }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
-   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
-   */
-  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
-                                  baseFileReader: BaseFileReader,
-                                  config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = {
-      if (baseFileIterator.hasNext) {
-        // No merge is required, simply load current row and project into 
required schema
-        recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
-        true
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-  }
-
-  /**
-   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
-   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
-   * streams
-   */
-  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
-                                          baseFileReader: BaseFileReader,
-                                          config: Configuration)
-    extends LogFileIterator(split, config) {
-
-    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
-    //        - Full table's schema
-    //        - Projected schema
-    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
-    //       to correspondingly set the scheme of the expected output of 
base-file reader
-    private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
-
-    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
-
-    private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
-
-    private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
-
-    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
-    private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-    override def hasNext: Boolean = hasNextInternal
-
-    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-    //       that recursion is unfolded into a loop to avoid stack overflows 
while
-    //       handling records
-    @tailrec private def hasNextInternal: Boolean = {
-      if (baseFileIterator.hasNext) {
-        val curRow = baseFileIterator.next()
-        val curKey = curRow.getString(recordKeyOrdinal)
-        val updatedRecordOpt = removeLogRecord(curKey)
-        if (updatedRecordOpt.isEmpty) {
-          // No merge is required, simply load current row and project into 
required schema
-          recordToLoad = requiredSchemaUnsafeProjection(curRow)
-          true
-        } else {
-          val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
-          if (mergedAvroRecordOpt.isEmpty) {
-            // Record has been deleted, skipping
-            this.hasNextInternal
-          } else {
-            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
-              requiredAvroSchema, reusableRecordBuilder)
-            recordToLoad = deserialize(projectedAvroRecord)
-            true
-          }
-        }
-      } else {
-        super[LogFileIterator].hasNext
-      }
-    }
-
-    private def serialize(curRowRecord: InternalRow): GenericRecord =
-      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
-      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
-      //       on the record from the Delta Log
-      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
-    }
-  }
-}
-
-private object HoodieMergeOnReadRDD {
-
-  val CONFIG_INSTANTIATION_LOCK = new Object()
-
-  def scanLog(logFiles: List[HoodieLogFile],
-              partitionPath: Path,
-              logSchema: Schema,
-              tableState: HoodieTableState,
-              maxCompactionMemoryInBytes: Long,
-              hadoopConf: Configuration, internalSchema: InternalSchema = 
InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
-    val tablePath = tableState.tablePath
-    val fs = FSUtils.getFs(tablePath, hadoopConf)
-
-    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
-      val metadataConfig = HoodieMetadataConfig.newBuilder()
-        
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
-      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
-      val metadataTable = new HoodieBackedTableMetadata(
-        new HoodieLocalEngineContext(hadoopConf), metadataConfig,
-        dataTableBasePath,
-        hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-
-      // We have to force full-scan for the MT log record reader, to make sure
-      // we can iterate over all of the partitions, since by default some of 
the partitions (Column Stats,
-      // Bloom Filter) are in "point-lookup" mode
-      val forceFullScan = true
-
-      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
-      //       of indirection among MT partitions)
-      val relativePartitionPath = getRelativePartitionPath(new 
Path(tablePath), partitionPath)
-      metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)))
-        .getLeft
-    } else {
-      val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
-        .withFileSystem(fs)
-        .withBasePath(tablePath)
-        .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
-        .withReaderSchema(logSchema)
-        .withLatestInstantTime(tableState.latestCommitTimestamp)
-        .withReadBlocksLazily(
-          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-            .getOrElse(false))
-        .withReverseReader(false)
-        .withInternalSchema(internalSchema)
-        .withBufferSize(
-          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
-        .withSpillableMapBasePath(
-          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-        .withDiskMapType(
-          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
-            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
-        .withBitCaskDiskMapCompressionEnabled(
-          
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-          
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-      if (logFiles.nonEmpty) {
-        logRecordScannerBuilder.withPartition(
-          getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
-      }
-
-      logRecordScannerBuilder.build()
-    }
-  }
-
-  private def projectAvroUnsafe(record: GenericRecord, projectedSchema: 
Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
-    val fields = projectedSchema.getFields.asScala
-    fields.foreach(field => reusableRecordBuilder.set(field, 
record.get(field.name())))
-    reusableRecordBuilder.build()
-  }
-
-  private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {

Review Comment:
   `HoodieCDCRDD` need to use `LogFileIterator` and `RecordMergingFileIterator` 
to extract the cdc data from the log files of MOR tables. So I just extend the 
arguments of the construct method to let them can be reused and move them to 
the common place. That's all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to