This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 11221fda2b34924fed736d6b2686af03d2999a11 Author: Alexey Kudinkin <[email protected]> AuthorDate: Wed Nov 30 13:31:33 2022 -0800 Rebased MOR iterators onto a `CachingIterator` (to be idempotent) (#7334) Addressing an invalid semantic of MOR iterators not being actually idempotent: ie, calling `hasNext` multiple times was actually leading to iterator advancing, therefore potentially skipping the elements for ex in cases like: ``` // [[isEmpty]] will invoke [[hasNext]] to check whether Iterator has any elements if (iter.isEmpty) { // ... } else { // Here [[map]] will invoke [[hasNext]] again, therefore skipping the elements iter.map { /* ... */ } } ``` --- .../scala/org/apache/hudi/HoodieSparkUtils.scala | 2 +- .../org/apache/hudi/HoodieMergeOnReadRDD.scala | 176 +---------- .../src/main/scala/org/apache/hudi/Iterators.scala | 332 +++++++++++++++++++++ .../org/apache/hudi/util/CachingIterator.scala | 44 +++ 4 files changed, 384 insertions(+), 170 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index df5e2777cbe..d02e6e6155a 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -96,7 +96,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion // Additionally, we have to explicitly wrap around resulting [[RDD]] into the one - // injecting [[SQLConf]], which by default isn't propgated by Spark to the executor(s). + // injecting [[SQLConf]], which by default isn't propagated by Spark to the executor(s). // [[SQLConf]] is required by [[AvroSerializer]] injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows => if (rows.isEmpty) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 512c97806f3..c58ce809c3d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -19,19 +19,18 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection, projectReader} -import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} +import org.apache.hudi.HoodieConversionUtils.{toJavaOption} import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals -import org.apache.hudi.HoodieMergeOnReadRDD._ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath -import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{HoodieLogFile, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodiePayloadConfig @@ -121,15 +120,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - new LogFileIterator(logFileOnlySplit, getConfig) + new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, getConfig) + new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => val reader = pickBaseFileReader - new RecordMergingFileIterator(split, reader, getConfig) + new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, getConfig) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + @@ -173,167 +172,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new Configuration(conf) } } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in - * Delta Log files (represented as [[InternalRow]]s) - */ - private class LogFileIterator(split: HoodieMergeOnReadFileSplit, - config: Configuration) - extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport { - - protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema - - protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - - protected var recordToLoad: InternalRow = _ - - private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema) - - private var logScanner = { - val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) - HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, - maxCompactionMemoryInBytes, config, internalSchema) - } - - private val logRecords = logScanner.getRecords.asScala - - // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's - // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier - protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = - logRecords.iterator.map { - case (_, record) => - toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) - .map(_.asInstanceOf[GenericRecord]) - } - - protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = - logRecords.remove(key) - - override def hasNext: Boolean = hasNextInternal - - // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure - // that recursion is unfolded into a loop to avoid stack overflows while - // handling records - @tailrec private def hasNextInternal: Boolean = { - logRecordsIterator.hasNext && { - val avroRecordOpt = logRecordsIterator.next() - if (avroRecordOpt.isEmpty) { - // Record has been deleted, skipping - this.hasNextInternal - } else { - val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) - recordToLoad = deserialize(projectedAvroRecord) - true - } - } - } - - override final def next(): InternalRow = recordToLoad - - override def close(): Unit = - if (logScanner != null) { - try { - logScanner.close() - } finally { - logScanner = null - } - } - } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in - * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not - * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) - */ - private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, - baseFileReader: BaseFileReader, - config: Configuration) - extends LogFileIterator(split, config) { - - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) - - private val baseFileIterator = baseFileReader(split.dataFile.get) - - override def hasNext: Boolean = { - if (baseFileIterator.hasNext) { - // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next()) - true - } else { - super[LogFileIterator].hasNext - } - } - } - - /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in - * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these - * streams - */ - private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileReader: BaseFileReader, - config: Configuration) - extends LogFileIterator(split, config) { - - // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either - // - Full table's schema - // - Projected schema - // As such, no particular schema could be assumed, and therefore we rely on the caller - // to correspondingly set the scheme of the expected output of base-file reader - private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") - - private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) - - private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) - - private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) - - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) - - private val baseFileIterator = baseFileReader(split.dataFile.get) - - override def hasNext: Boolean = hasNextInternal - - // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure - // that recursion is unfolded into a loop to avoid stack overflows while - // handling records - @tailrec private def hasNextInternal: Boolean = { - if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - val curKey = curRow.getString(recordKeyOrdinal) - val updatedRecordOpt = removeLogRecord(curKey) - if (updatedRecordOpt.isEmpty) { - // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(curRow) - true - } else { - val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) - if (mergedAvroRecordOpt.isEmpty) { - // Record has been deleted, skipping - this.hasNextInternal - } else { - val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], - requiredAvroSchema, reusableRecordBuilder) - recordToLoad = deserialize(projectedAvroRecord) - true - } - } - } else { - super[LogFileIterator].hasNext - } - } - - private def serialize(curRowRecord: InternalRow): GenericRecord = - serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] - - private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { - // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API - // on the record from the Delta Log - toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) - } - } } private object HoodieMergeOnReadRDD { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala new file mode 100644 index 00000000000..bb43459af14 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} +import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} +import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.config.HoodiePayloadConfig +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes +import org.apache.hudi.LogFileIterator._ +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} +import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.util.CachingIterator +import org.apache.spark.sql.avro.HoodieAvroDeserializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType + +import java.io.Closeable +import java.util.Properties +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.util.Try + + +trait AvroDeserializerSupport extends SparkAdapterSupport { + protected val avroSchema: Schema + protected val structTypeSchema: StructType + + private lazy val deserializer: HoodieAvroDeserializer = + sparkAdapter.createAvroDeserializer(avroSchema, structTypeSchema) + + protected def deserialize(avroRecord: GenericRecord): InternalRow = { + checkState(avroRecord.getSchema.getFields.size() == structTypeSchema.fields.length) + deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow] + } +} + +/** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in + * Delta Log files (represented as [[InternalRow]]s) + */ +class LogFileIterator(split: HoodieMergeOnReadFileSplit, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) + extends CachingIterator[InternalRow] with Closeable with AvroDeserializerSupport { + + protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) + + protected val payloadProps = tableState.preCombineFieldOpt + .map { preCombineField => + HoodiePayloadConfig.newBuilder + .withPayloadOrderingField(preCombineField) + .build + .getProps + }.getOrElse(new Properties()) + + protected override val avroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) + protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema + + protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) + + private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema) + + // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe + private var logScanner = { + val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) + scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, + maxCompactionMemoryInBytes, config, internalSchema) + } + + private val logRecords = logScanner.getRecords.asScala + + def logRecordsIterator(): Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = { + logRecords.iterator.asInstanceOf[Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])]] + } + + // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's + // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier + protected lazy val genericRecordsIterator: Iterator[Option[GenericRecord]] = + logRecords.iterator.map { + case (_, record) => + toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) + .map(_.asInstanceOf[GenericRecord]) + } + + protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = + logRecords.remove(key) + + protected def doHasNext: Boolean = hasNextInternal + + // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure + // that recursion is unfolded into a loop to avoid stack overflows while + // handling records + @tailrec private def hasNextInternal: Boolean = { + genericRecordsIterator.hasNext && { + val avroRecordOpt = genericRecordsIterator.next() + if (avroRecordOpt.isEmpty) { + // Record has been deleted, skipping + this.hasNextInternal + } else { + val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) + nextRecord = deserialize(projectedAvroRecord) + true + } + } + } + + override def close(): Unit = + if (logScanner != null) { + try { + logScanner.close() + } finally { + logScanner = null + } + } +} + +/** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in + * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not + * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) + */ +private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, + baseFileReader: BaseFileReader, + dataSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) + extends LogFileIterator(split, dataSchema, requiredSchema, tableState, config) { + + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) + + override def doHasNext: Boolean = { + if (baseFileIterator.hasNext) { + // No merge is required, simply load current row and project into required schema + nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next()) + true + } else { + super[LogFileIterator].doHasNext + } + } +} + +/** + * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in + * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these + * streams + */ +class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, + baseFileReader: BaseFileReader, + dataSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + config: Configuration) + extends LogFileIterator(split, dataSchema, requiredSchema, tableState, config) { + + // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either + // - Full table's schema + // - Projected schema + // As such, no particular schema could be assumed, and therefore we rely on the caller + // to correspondingly set the scheme of the expected output of base-file reader + private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") + + private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) + + private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(avroSchema) + + private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) + + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, structTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) + + override def doHasNext: Boolean = hasNextInternal + + // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure + // that recursion is unfolded into a loop to avoid stack overflows while + // handling records + @tailrec private def hasNextInternal: Boolean = { + if (baseFileIterator.hasNext) { + val curRow = baseFileIterator.next() + val curKey = curRow.getString(recordKeyOrdinal) + val updatedRecordOpt = removeLogRecord(curKey) + if (updatedRecordOpt.isEmpty) { + // No merge is required, simply load current row and project into required schema + nextRecord = requiredSchemaUnsafeProjection(curRow) + true + } else { + val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) + if (mergedAvroRecordOpt.isEmpty) { + // Record has been deleted, skipping + this.hasNextInternal + } else { + val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], + avroSchema, reusableRecordBuilder) + nextRecord = deserialize(projectedAvroRecord) + true + } + } + } else { + super[LogFileIterator].doHasNext + } + } + + private def serialize(curRowRecord: InternalRow): GenericRecord = + serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] + + private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { + // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API + // on the record from the Delta Log + toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) + } +} + +object LogFileIterator { + + val CONFIG_INSTANTIATION_LOCK = new Object() + + def scanLog(logFiles: List[HoodieLogFile], + partitionPath: Path, + logSchema: Schema, + tableState: HoodieTableState, + maxCompactionMemoryInBytes: Long, + hadoopConf: Configuration, + internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = { + val tablePath = tableState.tablePath + val fs = FSUtils.getFs(tablePath, hadoopConf) + + if (HoodieTableMetadata.isMetadataTable(tablePath)) { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(tableState.metadataConfig.getProps).enable(true).build() + val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath) + val metadataTable = new HoodieBackedTableMetadata( + new HoodieLocalEngineContext(hadoopConf), metadataConfig, + dataTableBasePath, + hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + + // We have to force full-scan for the MT log record reader, to make sure + // we can iterate over all of the partitions, since by default some of the partitions (Column Stats, + // Bloom Filter) are in "point-lookup" mode + val forceFullScan = true + + // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level + // of indirection among MT partitions) + val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) + metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan))) + .getLeft + } else { + val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(tablePath) + .withLogFilePaths(logFiles.map(logFile => logFile.getPath.toString).asJava) + .withReaderSchema(logSchema) + .withLatestInstantTime(tableState.latestCommitTimestamp) + .withReadBlocksLazily( + Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) + .getOrElse(false)) + .withReverseReader(false) + .withInternalSchema(internalSchema) + .withBufferSize( + hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes) + .withSpillableMapBasePath( + hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .withDiskMapType( + hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key, + HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)) + .withBitCaskDiskMapCompressionEnabled( + hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), + HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + + if (logFiles.nonEmpty) { + logRecordScannerBuilder.withPartition( + getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent)) + } + + logRecordScannerBuilder.build() + } + } + + def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = { + val fields = projectedSchema.getFields.asScala + fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name()))) + reusableRecordBuilder.build() + } + + def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { + // Determine partition path as an immediate parent folder of either + // - The base file + // - Some log file + split.dataFile.map(baseFile => new Path(baseFile.filePath)) + .getOrElse(split.logFiles.head.getPath) + .getParent + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala new file mode 100644 index 00000000000..8c8fb1023a7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util + +/** + * Extension of the [[Iterator]] allowing for caching of the underlying record produced + * during iteration to provide for the idempotency of the [[hasNext]] invocation: + * meaning, that invoking [[hasNext]] multiple times consequently (w/o invoking [[next]] + * in between) will only make iterator step over a single element + * + * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring iteration + * semantic to be implemented t/h overriding of a single [[doHasNext]] method + */ +trait CachingIterator[T >: Null] extends Iterator[T] { + + protected var nextRecord: T = _ + + protected def doHasNext: Boolean + + override final def hasNext: Boolean = nextRecord != null || doHasNext + + override final def next: T = { + val record = nextRecord + nextRecord = null + record + } + +}
