bvaradar commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1222417026
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -55,10 +55,11 @@ import scala.collection.mutable
import scala.util.Try
/**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all
of the records stored in
+ * Provided w/ list of log files, iterates over all of the records stored in
* Delta Log files (represented as [[InternalRow]]s)
*/
-class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+class LogFileIterator(logFiles: List[HoodieLogFile],
Review Comment:
Can we retain HoodieMergeOnReadFileSplit in all the iterators instead of
passing logFiles directly as you can derive this list from the split.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -338,12 +339,12 @@ object LogFileIterator {
}
}
- def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
+ def getPartitionPath(dataFile: Option[PartitionedFile], logFiles:
List[HoodieLogFile]): Path = {
Review Comment:
same case here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -429,8 +428,10 @@ class HoodieCDCRDD(
&& currentCDCFileSplit.getBeforeFileSlice.isPresent)
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
val absLogPath = new Path(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
- val morSplit = HoodieMergeOnReadFileSplit(None, List(new
HoodieLogFile(fs.getFileStatus(absLogPath))))
- val logFileIterator = new LogFileIterator(morSplit,
originTableSchema, originTableSchema, tableState, conf)
+ val logFiles = List(new
HoodieLogFile(fs.getFileStatus(absLogPath)))
Review Comment:
Is this change due to the cascading effect of changing the Iterator
interface ?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -81,23 +81,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ lazy val partitionPath =
LogFileIterator.getPartitionPath(partition.split.dataFile,
partition.split.logFiles)
val iter = partition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader =
projectReader(fileReaders.requiredSchemaReaderSkipMerging,
requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema,
tableState, getHadoopConf)
+ new LogFileIterator(logFileOnlySplit.logFiles, partitionPath,
tableSchema, requiredSchema,
+ tableState, getHadoopConf)
case split =>
mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
- new SkipMergeIterator(split, reader, tableSchema, requiredSchema,
tableState, getHadoopConf)
+ val iterator = reader(split.dataFile.get)
+ new SkipMergeIterator(split.logFiles, partitionPath, iterator,
reader.schema, tableSchema,
Review Comment:
Since Split abstracts the set of files to process, Its better to pass the
split directly to different iterators
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##########
@@ -79,28 +118,36 @@ case class HoodieBootstrapRelation(override val
sqlContext: SQLContext,
val dataFile = PartitionedFile(partitionValues,
getFilePath(baseFile.getBootstrapBaseFile.get.getFileStatus.getPath),
0, baseFile.getBootstrapBaseFile.get().getFileLen)
val skeletonFile = Option(PartitionedFile(InternalRow.empty,
baseFile.getPath, 0, baseFile.getFileLen))
-
- HoodieBootstrapSplit(dataFile, skeletonFile)
+ createFileSplit(fileSlice, dataFile, skeletonFile)
} else {
val dataFile =
PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus),
baseFile.getPath, 0, baseFile.getFileLen)
- HoodieBootstrapSplit(dataFile)
+ createFileSplit(fileSlice, dataFile, Option.empty)
}
}
}
- protected override def composeRDD(fileSplits: Seq[FileSplit],
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]): RDD[InternalRow]
= {
+ protected def getFileReaders(tableSchema: HoodieTableSchema,
Review Comment:
Lets add comments describing the purpose of different file readers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]