xushiyan commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r832116327
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -206,17 +206,6 @@ object AvroConversionUtils {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
- def buildAvroRecordBySchema(record: IndexedRecord,
Review comment:
this method not used? would be helpful to have a comment here to clarify
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
+ private val whitelistedPayloadClasses: Set[String] = Seq(
+ classOf[OverwriteWithLatestAvroPayload]
+ ).map(_.getName).toSet
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+ requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- logFileIterator(logFileOnlySplit, getConfig)
- case skipMergeSplit if
skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
=>
- skipMergeFileIterator(skipMergeSplit,
requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
- case payloadCombineSplit
- if
payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL)
=>
- payloadCombineFileIterator(payloadCombineSplit,
fullSchemaFileReader(payloadCombineSplit.dataFile.get),
- getConfig)
+ new LogFileIterator(logFileOnlySplit, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+ val baseFileIterator =
requiredSchemaFileReader.apply(split.dataFile.get)
+ new SkipMergeIterator(split, baseFileIterator, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+ val (baseFileIterator, schema) = readBaseFile(split)
+ new RecordMergingFileIterator(split, baseFileIterator, schema,
getConfig)
+
case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
- s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+ s"hoodie table path: ${tableState.tablePath}" +
s"spark partition Index: ${mergeOnReadPartition.index}" +
- s"merge type: ${mergeOnReadPartition.split.mergeType}")
+ s"merge type: ${mergeType}")
}
+
if (iter.isInstanceOf[Closeable]) {
// register a callback to close logScanner which will be executed on
task completion.
// when tasks finished, this method will be called, and release
resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
}
+
iter
}
+ private def readBaseFile(split: HoodieMergeOnReadFileSplit):
(Iterator[InternalRow], HoodieTableSchema) = {
+ // NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
+ // of the stored data possible, while still properly executing
corresponding relation's semantic
+ // and meet the query's requirements.
+ //
+ // Here we assume that iff queried table
+ // a) It does use one of the standard (and whitelisted) Record
Payload classes
+ // then we can avoid reading and parsing the records w/ _full_
schema, and instead only
+ // rely on projected one, nevertheless being able to perform merging
correctly
+ if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
+ (fullSchemaFileReader(split.dataFile.get), tableSchema)
Review comment:
looks better with `fullSchema` to distinguish with `requiredSchema`
```suggestion
(fullSchemaFileReader(split.dataFile.get), fullSchema)
```
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
- private def logFileIterator(split: HoodieMergeOnReadFileSplit,
- config: Configuration): Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- override def hasNext: Boolean = {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+ private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+ config: Configuration)
+ extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+ protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
+
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+ protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
+ protected var recordToLoad: InternalRow = _
+
+ // TODO validate whether we need to do UnsafeProjection
+ protected val unsafeProjection: UnsafeProjection =
UnsafeProjection.create(requiredStructTypeSchema)
+
+ // NOTE: This maps _required_ schema fields onto the _full_ table schema,
collecting their "ordinals"
+ // w/in the record payload. This is required, to project records
read from the Delta Log file
+ // which always reads records in full schema (never projected, due
to the fact that DL file might
+ // be stored in non-columnar formats like Avro, HFile, etc)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+ private var logScanner =
+ HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
+ maxCompactionMemoryInBytes, config)
+
+ private val logRecords = logScanner.getRecords.asScala
+
+ // NOTE: This iterator iterates over already projected (in required
schema) records
+ // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
+ // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
+ protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+ logRecords.iterator.map {
+ case (_, record) =>
+ val avroRecordOpt =
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
+ avroRecordOpt.map {
+ avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema,
requiredSchemaFieldOrdinals, recordBuilder)
}
- } else {
- false
- }
}
- override def next(): InternalRow = {
- recordToLoad
- }
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
- }
+ protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
+ logRecords.remove(key)
- private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator: Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- recordToLoad = unsafeProjection(curRow)
- true
+ override def hasNext: Boolean =
+ logRecordsIterator.hasNext && {
+ val avroRecordOpt = logRecordsIterator.next()
+ if (avroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- false
- }
+ recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+ true
}
}
- override def next(): InternalRow = {
- recordToLoad
- }
+ override final def next(): InternalRow = recordToLoad
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
+ override def close(): Unit =
+ if (logScanner != null) {
+ try {
+ logScanner.close()
+ } finally {
+ logScanner = null
}
}
+ }
+
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
+ */
+ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRow = baseFileIterator.next()
+ recordToLoad = unsafeProjection(curRow)
+ true
+ } else {
+ super[LogFileIterator].hasNext
+ }
}
+ }
- private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator:
Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val serializer =
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
- resolveAvroSchemaNullability(tableAvroSchema))
- private val requiredDeserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
- private val keyToSkip = mutable.Set.empty[String]
- private val recordKeyPosition =
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyPosition)
- if (logRecords.containsKey(curKey)) {
- // duplicate key found, merging
- keyToSkip.add(curKey)
- val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
- if (!mergedAvroRecord.isPresent) {
- // deleted
- this.hasNext
- } else {
- // load merged record as InternalRow with required schema
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- // No merge needed, load current row with required schema
- recordToLoad =
unsafeProjection(createInternalRowWithSchema(curRow,
requiredSchema.structTypeSchema, requiredFieldPosition))
- true
- }
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
+ * streams
+ */
+ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator:
Iterator[InternalRow],
+ baseFileReaderSchema:
HoodieTableSchema,
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
+ // - Full table's schema
+ // - Projected schema
+ // As such, no particular schema could be assumed, and therefore we
rely on the caller
+ // to correspondingly set the scheme of the expected output of
base-file reader
+ private val baseFileReaderAvroSchema = new
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+ private val serializer =
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+ baseFileReaderAvroSchema,
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
Review comment:
maybe make `AvroDeserializerSupport` provide a serializer and call
`AvroSerDeSupport` ?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -380,4 +422,17 @@ private object HoodieMergeOnReadRDD {
case (nullable, _) => nullable
}
}
+
+ trait AvroDeserializerSupport extends SparkAdapterSupport {
Review comment:
this looks like a generic trait.. maybe place in a more common class?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -79,13 +83,30 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected lazy val basePath: String = metaClient.getBasePath
- // If meta fields are enabled, always prefer key from the meta field as
opposed to user-specified one
- // NOTE: This is historical behavior which is preserved as is
+ // NOTE: Record key-field is assumed singular here due to the either of
+ // - In case Hudi's meta fields are enabled: record key will be
pre-materialized (stored) as part
+ // of the record's payload (as part of the Hudi's metadata)
+ // - In case Hudi's meta fields are disabled (virtual keys): in
that case record has to bear _single field_
+ // identified as its (unique) primary key w/in its payload (this is
a limitation of [[SimpleKeyGenerator]],
+ // which is the only [[KeyGenerator]] permitted for virtual-keys
payloads)
protected lazy val recordKeyField: String =
- if (tableConfig.populateMetaFields())
HoodieRecord.RECORD_KEY_METADATA_FIELD
- else tableConfig.getRecordKeyFieldProp
+ if (tableConfig.populateMetaFields()) {
+ HoodieRecord.RECORD_KEY_METADATA_FIELD
+ } else {
+ val keyFields = tableConfig.getRecordKeyFields.get()
+ checkState(keyFields.length == 1)
+ keyFields.head
+ }
- protected lazy val preCombineFieldOpt: Option[String] =
getPrecombineFieldProperty
+ protected lazy val preCombineFieldOpt: Option[String] =
+ Option(tableConfig.getPreCombineField)
+ .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
match {
+ // NOTE: This is required to compensate for cases when empty string is
used to stub
+ // property value to avoid it being set with the default value
+ // TODO(HUDI-3456) cleanup
+ case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
Review comment:
have seen this negation many other places. It would be more readable to
add the other util .
```suggestion
case Some(f) if StringUtils.nonEmpty(f) => Some(f)
```
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
+ private val whitelistedPayloadClasses: Set[String] = Seq(
+ classOf[OverwriteWithLatestAvroPayload]
Review comment:
should this include `DefaultHoodieRecordPayload` ?
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
+ private val whitelistedPayloadClasses: Set[String] = Seq(
+ classOf[OverwriteWithLatestAvroPayload]
+ ).map(_.getName).toSet
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+ requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
Review comment:
let's always call baseFile instead of dataFile, while you're at it
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
- private def logFileIterator(split: HoodieMergeOnReadFileSplit,
- config: Configuration): Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- override def hasNext: Boolean = {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
+ * Delta Log files (represented as [[InternalRow]]s)
+ */
+ private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+ config: Configuration)
+ extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+ protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
+
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+ protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
+ protected var recordToLoad: InternalRow = _
+
+ // TODO validate whether we need to do UnsafeProjection
+ protected val unsafeProjection: UnsafeProjection =
UnsafeProjection.create(requiredStructTypeSchema)
+
+ // NOTE: This maps _required_ schema fields onto the _full_ table schema,
collecting their "ordinals"
+ // w/in the record payload. This is required, to project records
read from the Delta Log file
+ // which always reads records in full schema (never projected, due
to the fact that DL file might
+ // be stored in non-columnar formats like Avro, HFile, etc)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+ private var logScanner =
+ HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
+ maxCompactionMemoryInBytes, config)
+
+ private val logRecords = logScanner.getRecords.asScala
+
+ // NOTE: This iterator iterates over already projected (in required
schema) records
+ // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
+ // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
+ protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+ logRecords.iterator.map {
+ case (_, record) =>
+ val avroRecordOpt =
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
+ avroRecordOpt.map {
+ avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema,
requiredSchemaFieldOrdinals, recordBuilder)
}
- } else {
- false
- }
}
- override def next(): InternalRow = {
- recordToLoad
- }
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
- }
+ protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
+ logRecords.remove(key)
- private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator: Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val deserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- recordToLoad = unsafeProjection(curRow)
- true
+ override def hasNext: Boolean =
+ logRecordsIterator.hasNext && {
+ val avroRecordOpt = logRecordsIterator.next()
+ if (avroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curAvrokey = logRecordsKeyIterator.next()
- val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!curAvroRecord.isPresent) {
- // delete record found, skipping
- this.hasNext
- } else {
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = deserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- false
- }
+ recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+ true
}
}
- override def next(): InternalRow = {
- recordToLoad
- }
+ override final def next(): InternalRow = recordToLoad
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
+ override def close(): Unit =
+ if (logScanner != null) {
+ try {
+ logScanner.close()
+ } finally {
+ logScanner = null
}
}
+ }
+
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
+ */
+ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRow = baseFileIterator.next()
+ recordToLoad = unsafeProjection(curRow)
+ true
+ } else {
+ super[LogFileIterator].hasNext
+ }
}
+ }
- private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator:
Iterator[InternalRow],
- config: Configuration):
Iterator[InternalRow] =
- new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
- private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- private val requiredFieldPosition =
- requiredSchema.structTypeSchema
- .map(f => tableAvroSchema.getField(f.name).pos()).toList
- private val serializer =
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
- resolveAvroSchemaNullability(tableAvroSchema))
- private val requiredDeserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema,
requiredSchema.structTypeSchema)
- private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
- private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
- private val logRecords = logScanner.getRecords
- private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
- private val keyToSkip = mutable.Set.empty[String]
- private val recordKeyPosition =
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
- private var recordToLoad: InternalRow = _
-
- @scala.annotation.tailrec
- override def hasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyPosition)
- if (logRecords.containsKey(curKey)) {
- // duplicate key found, merging
- keyToSkip.add(curKey)
- val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
- if (!mergedAvroRecord.isPresent) {
- // deleted
- this.hasNext
- } else {
- // load merged record as InternalRow with required schema
- val requiredAvroRecord =
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(),
requiredAvroSchema,
- requiredFieldPosition, recordBuilder)
- val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- } else {
- // No merge needed, load current row with required schema
- recordToLoad =
unsafeProjection(createInternalRowWithSchema(curRow,
requiredSchema.structTypeSchema, requiredFieldPosition))
- true
- }
+ /**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
+ * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
+ * streams
+ */
+ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator:
Iterator[InternalRow],
+ baseFileReaderSchema:
HoodieTableSchema,
+ config: Configuration)
+ extends LogFileIterator(split, config) {
+
+ // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
+ // - Full table's schema
+ // - Projected schema
+ // As such, no particular schema could be assumed, and therefore we
rely on the caller
+ // to correspondingly set the scheme of the expected output of
base-file reader
+ private val baseFileReaderAvroSchema = new
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+ private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+ private val serializer =
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+ baseFileReaderAvroSchema,
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+ private val recordKeyOrdinal =
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRowRecord = baseFileIterator.next()
+ val curKey = curRowRecord.getString(recordKeyOrdinal)
+ val updatedRecordOpt = removeLogRecord(curKey)
+ if (updatedRecordOpt.isEmpty) {
+ // No merge needed, load current row with required projected schema
+ recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord,
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+ true
} else {
- if (logRecordsKeyIterator.hasNext) {
- val curKey = logRecordsKeyIterator.next()
- if (keyToSkip.contains(curKey)) {
- this.hasNext
- } else {
- val insertAvroRecord =
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
- if (!insertAvroRecord.isPresent) {
- // stand alone delete record, skipping
- this.hasNext
- } else {
- val requiredAvroRecord = AvroConversionUtils
- .buildAvroRecordBySchema(
- insertAvroRecord.get(),
- requiredAvroSchema,
- requiredFieldPosition,
- recordBuilder
- )
- val rowOpt =
requiredDeserializer.deserialize(requiredAvroRecord)
- recordToLoad =
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
- true
- }
- }
+ val mergedAvroRecordOpt = merge(serialize(curRowRecord),
updatedRecordOpt.get)
+ if (mergedAvroRecordOpt.isEmpty) {
+ // Record has been deleted, skipping
+ this.hasNext
} else {
- false
+ // NOTE: In occurrence of a merge we can't know the schema of the
record being returned, b/c
+ // record from the Delta Log will bear (full) Table schema,
while record from the Base file
+ // might already be read in projected one (as an
optimization).
+ // As such we can't use more performant
[[projectAvroUnsafe]], and instead have to fallback
+ // to [[projectAvro]]
+ val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get,
requiredAvroSchema, recordBuilder)
+ recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+ true
}
}
+ } else {
+ super[LogFileIterator].hasNext
}
+ }
- override def next(): InternalRow = recordToLoad
-
- override def close(): Unit = {
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
- }
+ private def serialize(curRowRecord: InternalRow): GenericRecord =
+ serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
- private def mergeRowWithLog(curRow: InternalRow, curKey: String) :
org.apache.hudi.common.util.Option[IndexedRecord] = {
- val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
- val mergedRec = logRecords.get(curKey).getData
- .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema,
payloadProps)
- if (mergedRec.isPresent && mergedRec.get().getSchema !=
tableAvroSchema) {
-
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
tableAvroSchema).asInstanceOf[IndexedRecord])
- } else {
- mergedRec
- }
- }
+ private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+ // NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
+ // on the record from the Delta Log
+ toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord,
logFileReaderAvroSchema, payloadProps))
}
+ }
}
private object HoodieMergeOnReadRDD {
+
val CONFIG_INSTANTIATION_LOCK = new Object()
- def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
- val fs = FSUtils.getFs(split.tablePath, config)
- val logFiles = split.logFiles.get
+ def scanLog(logFiles: List[HoodieLogFile],
+ partitionPath: Path,
+ logSchema: Schema,
+ tableState: HoodieTableState,
+ maxCompactionMemoryInBytes: Long,
+ hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
Review comment:
why still have `maxCompactionMemoryInBytes` as an arg, which can be
retrieved from `hadoopConf` ? having >5 args makes the API harder to use
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -20,64 +20,15 @@ package org.apache.hudi
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile, SchemaColumnConvertNotSupportedException}
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionedFile}
case class HoodieBaseFileSplit(filePartition: FilePartition) extends
HoodieFileSplit
-/**
- * TODO eval if we actually need it
- */
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[HoodieBaseFileSplit])
- extends HoodieUnsafeRDD(sparkSession.sparkContext) {
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
Review comment:
this was spark code copied over?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]