alexeykudinkin commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r835399807
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +76,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
+ private val whitelistedPayloadClasses: Set[String] = Seq(
+ classOf[OverwriteWithLatestAvroPayload]
+ ).map(_.getName).toSet
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+ requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- logFileIterator(logFileOnlySplit, getConfig)
- case skipMergeSplit if
skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
=>
- skipMergeFileIterator(skipMergeSplit,
requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
- case payloadCombineSplit
- if
payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL)
=>
- payloadCombineFileIterator(payloadCombineSplit,
fullSchemaFileReader(payloadCombineSplit.dataFile.get),
- getConfig)
+ new LogFileIterator(logFileOnlySplit, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+ val baseFileIterator =
requiredSchemaFileReader.apply(split.dataFile.get)
+ new SkipMergeIterator(split, baseFileIterator, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+ val (baseFileIterator, schema) = readBaseFile(split)
+ new RecordMergingFileIterator(split, baseFileIterator, schema,
getConfig)
+
case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
- s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+ s"hoodie table path: ${tableState.tablePath}" +
s"spark partition Index: ${mergeOnReadPartition.index}" +
- s"merge type: ${mergeOnReadPartition.split.mergeType}")
+ s"merge type: ${mergeType}")
}
+
if (iter.isInstanceOf[Closeable]) {
// register a callback to close logScanner which will be executed on
task completion.
// when tasks finished, this method will be called, and release
resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
}
+
iter
}
+ private def readBaseFile(split: HoodieMergeOnReadFileSplit):
(Iterator[InternalRow], HoodieTableSchema) = {
+ // NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
+ // of the stored data possible, while still properly executing
corresponding relation's semantic
+ // and meet the query's requirements.
+ //
+ // Here we assume that iff queried table
+ // a) It does use one of the standard (and whitelisted) Record
Payload classes
+ // then we can avoid reading and parsing the records w/ _full_
schema, and instead only
+ // rely on projected one, nevertheless being able to perform merging
correctly
+ if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
Review comment:
Not sure i follow how you propose to change this expr?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
- private def logFileIterator(split: HoodieMergeOnReadFileSplit,
- config: Configuration): Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- override def hasNext: Boolean = {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+ private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+ config: Configuration)
+ extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+ protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
+
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+ protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
+ protected var recordToLoad: InternalRow = _
+
+ // TODO validate whether we need to do UnsafeProjection
+ protected val unsafeProjection: UnsafeProjection =
UnsafeProjection.create(requiredStructTypeSchema)
+
+ // NOTE: This maps _required_ schema fields onto the _full_ table schema,
collecting their "ordinals"
+ // w/in the record payload. This is required, to project records
read from the Delta Log file
+ // which always reads records in full schema (never projected, due
to the fact that DL file might
+ // be stored in non-columnar formats like Avro, HFile, etc)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+ private var logScanner =
+ HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
+ maxCompactionMemoryInBytes, config)
+
+ private val logRecords = logScanner.getRecords.asScala
+
+ // NOTE: This iterator iterates over already projected (in required
schema) records
+ // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
+ // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
+ protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+ logRecords.iterator.map {
+ case (_, record) =>
+ val avroRecordOpt =
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
+ avroRecordOpt.map {
+ avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema,
requiredSchemaFieldOrdinals, recordBuilder)
}
- } else {
- false
- }
}
- override def next(): InternalRow = {
- recordToLoad
- }
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
- }
+ protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
+ logRecords.remove(key)
- private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator: Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- recordToLoad = unsafeProjection(curRow)
- true
+ override def hasNext: Boolean =
+ logRecordsIterator.hasNext && {
+ val avroRecordOpt = logRecordsIterator.next()
+ if (avroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- false
- }
+ recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+ true
}
}
- override def next(): InternalRow = {
- recordToLoad
- }
+ override final def next(): InternalRow = recordToLoad
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
+ override def close(): Unit =
+ if (logScanner != null) {
+ try {
+ logScanner.close()
+ } finally {
+ logScanner = null
}
}
+ }
+
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
+ */
+ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRow = baseFileIterator.next()
+ recordToLoad = unsafeProjection(curRow)
+ true
+ } else {
+ super[LogFileIterator].hasNext
+ }
}
+ }
- private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator:
Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val serializer =
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
- resolveAvroSchemaNullability(tableAvroSchema))
- private val requiredDeserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
- private val keyToSkip = mutable.Set.empty[String]
- private val recordKeyPosition =
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyPosition)
- if (logRecords.containsKey(curKey)) {
- // duplicate key found, merging
- keyToSkip.add(curKey)
- val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
- if (!mergedAvroRecord.isPresent) {
- // deleted
- this.hasNext
- } else {
- // load merged record as InternalRow with required schema
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- // No merge needed, load current row with required schema
- recordToLoad =
unsafeProjection(createInternalRowWithSchema(curRow,
requiredSchema.structTypeSchema, requiredFieldPosition))
- true
- }
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
+ * streams
+ */
+ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator:
Iterator[InternalRow],
+ baseFileReaderSchema:
HoodieTableSchema,
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
+ // - Full table's schema
+ // - Projected schema
+ // As such, no particular schema could be assumed, and therefore we
rely on the caller
+ // to correspondingly set the scheme of the expected output of
base-file reader
+ private val baseFileReaderAvroSchema = new
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+ private val serializer =
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+ baseFileReaderAvroSchema,
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+ private val recordKeyOrdinal =
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRowRecord = baseFileIterator.next()
+ val curKey = curRowRecord.getString(recordKeyOrdinal)
+ val updatedRecordOpt = removeLogRecord(curKey)
+ if (updatedRecordOpt.isEmpty) {
+ // No merge needed, load current row with required projected schema
+ recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord,
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+ true
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curKey = logRecordsKeyIterator.next()
- if (keyToSkip.contains(curKey)) {
- this.hasNext
- } else {
- val insertAvroRecord =
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!insertAvroRecord.isPresent) {
- // stand alone delete record, skipping
- this.hasNext
- } else {
- val requiredAvroRecord = AvroConversionUtils
- .buildAvroRecordBySchema(
- insertAvroRecord.get(),
- requiredAvroSchema,
- requiredFieldPosition,
- recordBuilder
- )
- val rowOpt =
requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- }
+ val mergedAvroRecordOpt = merge(serialize(curRowRecord),
updatedRecordOpt.get)
+ if (mergedAvroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- false
+ // NOTE: In occurrence of a merge we can't know the schema of the
record being returned, b/c
+ // record from the Delta Log will bear (full) Table schema,
while record from the Base file
+ // might already be read in projected one (as an
optimization).
+ // As such we can't use more performant
[[projectAvroUnsafe]], and instead have to fallback
+ // to [[projectAvro]]
+ val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get,
requiredAvroSchema, recordBuilder)
+ recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+ true
}
}
+ } else {
+ super[LogFileIterator].hasNext
}
+ }
- override def next(): InternalRow = recordToLoad
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
+ private def serialize(curRowRecord: InternalRow): GenericRecord =
+ serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
- private def mergeRowWithLog(curRow: InternalRow, curKey: String) :
org.apache.hudi.common.util.Option[IndexedRecord] = {
- val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
- val mergedRec = logRecords.get(curKey).getData
- .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema,
payloadProps)
- if (mergedRec.isPresent && mergedRec.get().getSchema !=
tableAvroSchema) {
-
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
tableAvroSchema).asInstanceOf[IndexedRecord])
- } else {
- mergedRec
- }
- }
+ private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+ // NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
+ // on the record from the Delta Log
+ toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord,
logFileReaderAvroSchema, payloadProps))
}
+ }
}
private object HoodieMergeOnReadRDD {
+
val CONFIG_INSTANTIATION_LOCK = new Object()
- def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
- val fs = FSUtils.getFs(split.tablePath, config)
- val logFiles = split.logFiles.get
+ def scanLog(logFiles: List[HoodieLogFile],
+ partitionPath: Path,
+ logSchema: Schema,
+ tableState: HoodieTableState,
+ maxCompactionMemoryInBytes: Long,
+ hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
Review comment:
I inlined `maxCompactionMemoryInBytes` into `HoodieMergeOnReadRDD`
(previously was passed in as an arg), but i don't think it makes sense to
eliminate it from this arg line here -- it's not a simple field access but
requires quite some computation.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +139,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
- private def logFileIterator(split: HoodieMergeOnReadFileSplit,
- config: Configuration): Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- override def hasNext: Boolean = {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+ private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+ config: Configuration)
+ extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+ protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
+
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+ protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
+ protected var recordToLoad: InternalRow = _
+
+ // TODO validate whether we need to do UnsafeProjection
+ protected val unsafeProjection: UnsafeProjection =
UnsafeProjection.create(requiredStructTypeSchema)
+
+ // NOTE: This maps _required_ schema fields onto the _full_ table schema,
collecting their "ordinals"
+ // w/in the record payload. This is required, to project records
read from the Delta Log file
+ // which always reads records in full schema (never projected, due
to the fact that DL file might
+ // be stored in non-columnar formats like Avro, HFile, etc)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+ private var logScanner =
+ HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
+ maxCompactionMemoryInBytes, config)
+
+ private val logRecords = logScanner.getRecords.asScala
+
+ // NOTE: This iterator iterates over already projected (in required
schema) records
+ // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
+ // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
+ protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+ logRecords.iterator.map {
+ case (_, record) =>
+ val avroRecordOpt =
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
+ avroRecordOpt.map {
+ avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema,
requiredSchemaFieldOrdinals, recordBuilder)
}
- } else {
- false
- }
}
- override def next(): InternalRow = {
- recordToLoad
- }
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
- }
+ protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
+ logRecords.remove(key)
- private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator: Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- recordToLoad = unsafeProjection(curRow)
- true
+ override def hasNext: Boolean =
+ logRecordsIterator.hasNext && {
+ val avroRecordOpt = logRecordsIterator.next()
+ if (avroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- false
- }
+ recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+ true
}
}
- override def next(): InternalRow = {
- recordToLoad
- }
+ override final def next(): InternalRow = recordToLoad
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
+ override def close(): Unit =
+ if (logScanner != null) {
+ try {
+ logScanner.close()
+ } finally {
+ logScanner = null
}
}
+ }
+
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
+ */
+ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRow = baseFileIterator.next()
+ recordToLoad = unsafeProjection(curRow)
+ true
+ } else {
+ super[LogFileIterator].hasNext
+ }
}
+ }
- private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator:
Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val serializer =
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
- resolveAvroSchemaNullability(tableAvroSchema))
- private val requiredDeserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
- private val keyToSkip = mutable.Set.empty[String]
- private val recordKeyPosition =
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyPosition)
- if (logRecords.containsKey(curKey)) {
- // duplicate key found, merging
- keyToSkip.add(curKey)
- val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
- if (!mergedAvroRecord.isPresent) {
- // deleted
- this.hasNext
- } else {
- // load merged record as InternalRow with required schema
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- // No merge needed, load current row with required schema
- recordToLoad =
unsafeProjection(createInternalRowWithSchema(curRow,
requiredSchema.structTypeSchema, requiredFieldPosition))
- true
- }
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
+ * streams
+ */
+ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator:
Iterator[InternalRow],
+ baseFileReaderSchema:
HoodieTableSchema,
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
+ // - Full table's schema
+ // - Projected schema
+ // As such, no particular schema could be assumed, and therefore we
rely on the caller
+ // to correspondingly set the scheme of the expected output of
base-file reader
+ private val baseFileReaderAvroSchema = new
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+ private val serializer =
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+ baseFileReaderAvroSchema,
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+ private val recordKeyOrdinal =
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRowRecord = baseFileIterator.next()
+ val curKey = curRowRecord.getString(recordKeyOrdinal)
+ val updatedRecordOpt = removeLogRecord(curKey)
+ if (updatedRecordOpt.isEmpty) {
+ // No merge needed, load current row with required projected schema
+ recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord,
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+ true
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curKey = logRecordsKeyIterator.next()
- if (keyToSkip.contains(curKey)) {
- this.hasNext
- } else {
- val insertAvroRecord =
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!insertAvroRecord.isPresent) {
- // stand alone delete record, skipping
- this.hasNext
- } else {
- val requiredAvroRecord = AvroConversionUtils
- .buildAvroRecordBySchema(
- insertAvroRecord.get(),
- requiredAvroSchema,
- requiredFieldPosition,
- recordBuilder
- )
- val rowOpt =
requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- }
+ val mergedAvroRecordOpt = merge(serialize(curRowRecord),
updatedRecordOpt.get)
+ if (mergedAvroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- false
+ // NOTE: In occurrence of a merge we can't know the schema of the
record being returned, b/c
+ // record from the Delta Log will bear (full) Table schema,
while record from the Base file
+ // might already be read in projected one (as an
optimization).
+ // As such we can't use more performant
[[projectAvroUnsafe]], and instead have to fallback
+ // to [[projectAvro]]
+ val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get,
requiredAvroSchema, recordBuilder)
+ recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+ true
}
}
+ } else {
+ super[LogFileIterator].hasNext
}
+ }
- override def next(): InternalRow = recordToLoad
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
+ private def serialize(curRowRecord: InternalRow): GenericRecord =
+ serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
- private def mergeRowWithLog(curRow: InternalRow, curKey: String) :
org.apache.hudi.common.util.Option[IndexedRecord] = {
- val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
- val mergedRec = logRecords.get(curKey).getData
- .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema,
payloadProps)
- if (mergedRec.isPresent && mergedRec.get().getSchema !=
tableAvroSchema) {
-
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
tableAvroSchema).asInstanceOf[IndexedRecord])
- } else {
- mergedRec
- }
- }
+ private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+ // NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
+ // on the record from the Delta Log
+ toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord,
logFileReaderAvroSchema, payloadProps))
}
+ }
}
private object HoodieMergeOnReadRDD {
+
val CONFIG_INSTANTIATION_LOCK = new Object()
- def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
- val fs = FSUtils.getFs(split.tablePath, config)
- val logFiles = split.logFiles.get
+ def scanLog(logFiles: List[HoodieLogFile],
+ partitionPath: Path,
+ logSchema: Schema,
+ tableState: HoodieTableState,
+ maxCompactionMemoryInBytes: Long,
+ hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
+ val tablePath = tableState.tablePath
+ val fs = FSUtils.getFs(tablePath, hadoopConf)
- if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+ if (HoodieTableMetadata.isMetadataTable(tablePath)) {
val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build()
- val dataTableBasePath =
getDataTableBasePathFromMetadataTable(split.tablePath)
+ val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val metadataTable = new HoodieBackedTableMetadata(
- new HoodieLocalEngineContext(config), metadataConfig,
+ new HoodieLocalEngineContext(hadoopConf), metadataConfig,
dataTableBasePath,
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
// NOTE: In case of Metadata Table partition path equates to partition
name (since there's just one level
// of indirection among MT partitions)
- val relativePartitionPath = getRelativePartitionPath(new
Path(split.tablePath), getPartitionPath(split))
+ val relativePartitionPath = getRelativePartitionPath(new
Path(tablePath), partitionPath)
metadataTable.getLogRecordScanner(logFiles.asJava,
relativePartitionPath).getLeft
} else {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
- .withBasePath(split.tablePath)
- .withLogFilePaths(split.logFiles.get.map(logFile =>
getFilePath(logFile.getPath)).asJava)
+ .withBasePath(tablePath)
+ .withLogFilePaths(logFiles.map(logFile =>
getFilePath(logFile.getPath)).asJava)
.withReaderSchema(logSchema)
- .withLatestInstantTime(split.latestCommit)
+ .withLatestInstantTime(tableState.latestCommitTimestamp)
.withReadBlocksLazily(
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
+ .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+ hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
if (logFiles.nonEmpty) {
- logRecordScannerBuilder.withPartition(getRelativePartitionPath(new
Path(split.tablePath), logFiles.head.getPath.getParent))
+ logRecordScannerBuilder.withPartition(
+ getRelativePartitionPath(new Path(tableState.tablePath),
logFiles.head.getPath.getParent))
}
logRecordScannerBuilder.build()
}
}
+ /**
+ * Projects provided instance of [[InternalRow]] into provided schema,
assuming that the
+ * the schema of the original row is strictly a superset of the given one
+ */
+ private def projectRowUnsafe(row: InternalRow,
+ projectedSchema: StructType,
+ ordinals: Seq[Int]): InternalRow = {
+ val projectedRow = new SpecificInternalRow(projectedSchema)
+ var curIndex = 0
+ projectedSchema.zip(ordinals).foreach { case (field, pos) =>
+ val curField = if (row.isNullAt(pos)) {
+ null
+ } else {
+ row.get(pos, field.dataType)
+ }
+ projectedRow.update(curIndex, curField)
+ curIndex += 1
+ }
+ projectedRow
+ }
+
+ /**
+ * Projects provided instance of [[IndexedRecord]] into provided schema,
assuming that the
+ * the schema of the original row is strictly a superset of the given one
+ */
+ def projectAvroUnsafe(record: IndexedRecord,
+ projectedSchema: Schema,
+ ordinals: List[Int],
+ recordBuilder: GenericRecordBuilder): GenericRecord = {
+ val fields = projectedSchema.getFields.asScala
+ assert(fields.length == ordinals.length)
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]