alexeykudinkin commented on code in PR #7804:
URL: https://github.com/apache/hudi/pull/7804#discussion_r1106165388
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala:
##########
@@ -51,59 +50,44 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
}
}
- var partitionedFileIterator: Iterator[InternalRow] = null
+ bootstrapPartition.split.skeletonFile match {
+ case Some(skeletonFile) =>
+ // It is a bootstrap split. Check both skeleton and data files.
+ if (bootstrapDataFileReader.schema.isEmpty) {
+ // No data column to fetch, hence fetch only from skeleton file
+ bootstrapSkeletonFileReader.read(skeletonFile)
+ } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+ // No metadata column to fetch, hence fetch only from data file
+ bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
+ } else {
+ // Fetch from both data and skeleton file, and merge
+ val dataFileIterator =
bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
+ val skeletonFileIterator =
bootstrapSkeletonFileReader.read(skeletonFile)
+ merge(skeletonFileIterator, dataFileIterator)
+ }
- if (bootstrapPartition.split.skeletonFile.isDefined) {
- // It is a bootstrap split. Check both skeleton and data files.
- if (dataSchema.isEmpty) {
- // No data column to fetch, hence fetch only from skeleton file
- partitionedFileIterator =
skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
- } else if (skeletonSchema.isEmpty) {
- // No metadata column to fetch, hence fetch only from data file
- partitionedFileIterator =
dataReadFunction(bootstrapPartition.split.dataFile)
- } else {
- // Fetch from both data and skeleton file, and merge
- val dataFileIterator =
dataReadFunction(bootstrapPartition.split.dataFile)
- val skeletonFileIterator =
skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
- partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
- }
- } else {
- partitionedFileIterator =
regularReadFunction(bootstrapPartition.split.dataFile)
+ case _ => regularFileReader.read(bootstrapPartition.split.dataFile)
}
- partitionedFileIterator
}
- def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator:
Iterator[InternalRow])
- : Iterator[InternalRow] = {
+ def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator:
Iterator[InternalRow]): Iterator[InternalRow] = {
new Iterator[InternalRow] {
- override def hasNext: Boolean = dataFileIterator.hasNext &&
skeletonFileIterator.hasNext
- override def next(): InternalRow = {
- mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
- }
- }
- }
+ private val combinedRow = new JoinedRow()
- def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow):
InternalRow = {
- val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
- val dataArr = dataRow.copy().toSeq(dataSchema)
Review Comment:
Copying `InternalRow`s is punitive performance-wise (better approach is to
use `UnsafeProjection` for that)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -451,10 +455,15 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
* For enable hoodie.datasource.write.drop.partition.columns, need to create
an InternalRow on partition values
* and pass this reader on parquet file. So that, we can query the partition
columns.
*/
- protected def getPartitionColumnsAsInternalRow(file: FileStatus):
InternalRow = {
+
+ protected def getPartitionColumnsAsInternalRow(file: FileStatus):
InternalRow =
Review Comment:
Some of the Base class methods are parameterized to be able to provide for
configurability of whether partition-values should be parsed from
partition-path (this is required for Bootstrapped relation since this behavior
in Spark is unconditiional)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -574,6 +584,12 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema):
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+ tryPrunePartitionColumnsInternal(tableSchema, requiredSchema,
shouldExtractPartitionValuesFromPartitionPath)
Review Comment:
Same comment as above
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##########
@@ -44,150 +44,169 @@ import scala.collection.JavaConverters._
* bootstrapped files, because then the metadata file and data file can
return different number of rows causing errors
* merging.
*
- * @param _sqlContext Spark SQL Context
+ * @param sqlContext Spark SQL Context
* @param userSchema User specified schema in the datasource query
* @param globPaths The global paths to query. If it not none, read from the
globPaths,
* else read data from tablePath using HoodiFileIndex.
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
-class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
- val userSchema: Option[StructType],
- val globPaths: Seq[Path],
- val metaClient: HoodieTableMetaClient,
- val optParams: Map[String, String]) extends
BaseRelation
- with PrunedFilteredScan with Logging {
+case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
Review Comment:
Crux of the change here is rebasing BootstrapRelation onto
`HoodieBaseRelation`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]