YannByron commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r966829945
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -169,310 +141,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def getConfig: Configuration = {
val conf = confBroadcast.value.value
- HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
+ CONFIG_INSTANTIATION_LOCK.synchronized {
new Configuration(conf)
}
}
-
- /**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
- * Delta Log files (represented as [[InternalRow]]s)
- */
- private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
- config: Configuration)
- extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
-
- protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
-
- protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
-
- protected var recordToLoad: InternalRow = _
-
- private val requiredSchemaSafeAvroProjection =
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
-
- private var logScanner = {
- val internalSchema =
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
- HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
- maxCompactionMemoryInBytes, config, internalSchema)
- }
-
- private val logRecords = logScanner.getRecords.asScala
-
- // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
- // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
- protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
- logRecords.iterator.map {
- case (_, record) =>
- toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
- .map(_.asInstanceOf[GenericRecord])
- }
-
- protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
- logRecords.remove(key)
-
- override def hasNext: Boolean = hasNextInternal
-
- // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
- // that recursion is unfolded into a loop to avoid stack overflows
while
- // handling records
- @tailrec private def hasNextInternal: Boolean = {
- logRecordsIterator.hasNext && {
- val avroRecordOpt = logRecordsIterator.next()
- if (avroRecordOpt.isEmpty) {
- // Record has been deleted, skipping
- this.hasNextInternal
- } else {
- val projectedAvroRecord =
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
- recordToLoad = deserialize(projectedAvroRecord)
- true
- }
- }
- }
-
- override final def next(): InternalRow = recordToLoad
-
- override def close(): Unit =
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
-
- /**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
- * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
- * performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
- */
- private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
- baseFileReader: BaseFileReader,
- config: Configuration)
- extends LogFileIterator(split, config) {
-
- private val requiredSchemaUnsafeProjection =
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
- private val baseFileIterator = baseFileReader(split.dataFile.get)
-
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- // No merge is required, simply load current row and project into
required schema
- recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
- true
- } else {
- super[LogFileIterator].hasNext
- }
- }
- }
-
- /**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
- * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
- * streams
- */
- private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileReader: BaseFileReader,
- config: Configuration)
- extends LogFileIterator(split, config) {
-
- // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
- // - Full table's schema
- // - Projected schema
- // As such, no particular schema could be assumed, and therefore we
rely on the caller
- // to correspondingly set the scheme of the expected output of
base-file reader
- private val baseFileReaderAvroSchema =
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable
= false, "record")
-
- private val serializer =
sparkAdapter.createAvroSerializer(baseFileReader.schema,
baseFileReaderAvroSchema, nullable = false)
-
- private val reusableRecordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
-
- private val recordKeyOrdinal =
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
-
- private val requiredSchemaUnsafeProjection =
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
-
- private val baseFileIterator = baseFileReader(split.dataFile.get)
-
- override def hasNext: Boolean = hasNextInternal
-
- // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
- // that recursion is unfolded into a loop to avoid stack overflows
while
- // handling records
- @tailrec private def hasNextInternal: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyOrdinal)
- val updatedRecordOpt = removeLogRecord(curKey)
- if (updatedRecordOpt.isEmpty) {
- // No merge is required, simply load current row and project into
required schema
- recordToLoad = requiredSchemaUnsafeProjection(curRow)
- true
- } else {
- val mergedAvroRecordOpt = merge(serialize(curRow),
updatedRecordOpt.get)
- if (mergedAvroRecordOpt.isEmpty) {
- // Record has been deleted, skipping
- this.hasNextInternal
- } else {
- val projectedAvroRecord =
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
- requiredAvroSchema, reusableRecordBuilder)
- recordToLoad = deserialize(projectedAvroRecord)
- true
- }
- }
- } else {
- super[LogFileIterator].hasNext
- }
- }
-
- private def serialize(curRowRecord: InternalRow): GenericRecord =
- serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
- private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
- // NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
- // on the record from the Delta Log
- toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord,
logFileReaderAvroSchema, payloadProps))
- }
- }
-}
-
-private object HoodieMergeOnReadRDD {
-
- val CONFIG_INSTANTIATION_LOCK = new Object()
-
- def scanLog(logFiles: List[HoodieLogFile],
- partitionPath: Path,
- logSchema: Schema,
- tableState: HoodieTableState,
- maxCompactionMemoryInBytes: Long,
- hadoopConf: Configuration, internalSchema: InternalSchema =
InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
- val tablePath = tableState.tablePath
- val fs = FSUtils.getFs(tablePath, hadoopConf)
-
- if (HoodieTableMetadata.isMetadataTable(tablePath)) {
- val metadataConfig = HoodieMetadataConfig.newBuilder()
-
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
- val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
- val metadataTable = new HoodieBackedTableMetadata(
- new HoodieLocalEngineContext(hadoopConf), metadataConfig,
- dataTableBasePath,
- hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-
- // We have to force full-scan for the MT log record reader, to make sure
- // we can iterate over all of the partitions, since by default some of
the partitions (Column Stats,
- // Bloom Filter) are in "point-lookup" mode
- val forceFullScan = true
-
- // NOTE: In case of Metadata Table partition path equates to partition
name (since there's just one level
- // of indirection among MT partitions)
- val relativePartitionPath = getRelativePartitionPath(new
Path(tablePath), partitionPath)
- metadataTable.getLogRecordScanner(logFiles.asJava,
relativePartitionPath, toJavaOption(Some(forceFullScan)))
- .getLeft
- } else {
- val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(tablePath)
- .withLogFilePaths(logFiles.map(logFile =>
logFile.getPath.toString).asJava)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(tableState.latestCommitTimestamp)
- .withReadBlocksLazily(
-
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
- .withReverseReader(false)
- .withInternalSchema(internalSchema)
- .withBufferSize(
-
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
- .withDiskMapType(
- hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
- HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
- .withBitCaskDiskMapCompressionEnabled(
-
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
- if (logFiles.nonEmpty) {
- logRecordScannerBuilder.withPartition(
- getRelativePartitionPath(new Path(tableState.tablePath),
logFiles.head.getPath.getParent))
- }
-
- logRecordScannerBuilder.build()
- }
- }
-
- private def projectAvroUnsafe(record: GenericRecord, projectedSchema:
Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
- val fields = projectedSchema.getFields.asScala
- fields.foreach(field => reusableRecordBuilder.set(field,
record.get(field.name())))
- reusableRecordBuilder.build()
- }
-
- private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
Review Comment:
`HoodieCDCRDD` need to use `LogFileIterator` and `RecordMergingFileIterator`
to extract the cdc data from the log files of MOR tables. So I just extend the
arguments of the construct method to let them can be reused and move them to
the common place. That's all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]