codope commented on code in PR #5275:
URL: https://github.com/apache/hudi/pull/5275#discussion_r846988154
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -113,59 +114,88 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport
{
*
* @param spark Spark session ref
* @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
- * @param targetColumns target columns to be included into the final table
+ * @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+ def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
val colStatsSchema = colStatsDF.schema
val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
case (field, ordinal) => (field.name, ordinal)
}).toMap
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
- // NOTE: We're sorting the columns to make sure final index schema matches
layout
- // of the transposed table
- val sortedColumns = TreeSet(targetColumns: _*)
-
val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
- val transposedRDD = colStatsDF.rdd
- .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- val (minValue, _) =
tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
- val (maxValue, _) =
tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
-
- val colName = row.getString(colNameOrdinal)
- val colType = tableSchemaFieldMap(colName).dataType
+ // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
+ // w/in the transposed dataset: since some files might not have all
of the columns indexed
+ // either due to the Column Stats Index config changes, schema
evolution, etc, we have
+ // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
+ // values) for such file-column combinations
+ val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
Review Comment:
Why not do this one level above in `readColumnStatsIndex` so that
`colStatsDF` itself is correctly populated and `transposeColumnStatsIndex`
simply transposes as today?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]