alexeykudinkin commented on code in PR #5275:
URL: https://github.com/apache/hudi/pull/5275#discussion_r847484362
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -113,59 +114,88 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport
{
*
* @param spark Spark session ref
* @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
- * @param targetColumns target columns to be included into the final table
+ * @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+ def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
val colStatsSchema = colStatsDF.schema
val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
case (field, ordinal) => (field.name, ordinal)
}).toMap
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
- // NOTE: We're sorting the columns to make sure final index schema matches
layout
- // of the transposed table
- val sortedColumns = TreeSet(targetColumns: _*)
-
val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
- val transposedRDD = colStatsDF.rdd
- .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- val (minValue, _) =
tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
- val (maxValue, _) =
tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
-
- val colName = row.getString(colNameOrdinal)
- val colType = tableSchemaFieldMap(colName).dataType
+ // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
+ // w/in the transposed dataset: since some files might not have all
of the columns indexed
+ // either due to the Column Stats Index config changes, schema
evolution, etc, we have
+ // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
+ // values) for such file-column combinations
+ val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
Review Comment:
`colStatsDF` is populated correctly -- it bears 1 row / column (let's call
it "row-based"), therefore for all columns in a file we will have N rows
corresponding to it (eq to the # of columns in that file).
Transposed table is "column-based", ie there's 1 row / file and each
column's stat is mapped to a column in such view. Therefore only in that view
we have a need to align the rows (to pad them).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]