yihua commented on code in PR #5746:
URL: https://github.com/apache/hudi/pull/5746#discussion_r929357347
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends
SparkAdapterSupport {
* column references from the filtering expressions, and only
transpose records corresponding to the
* columns referenced in those
*
- * @param spark Spark session ref
- * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing
raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
- val colStatsSchema = colStatsDF.schema
- val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
- case (field, ordinal) => (field.name, ordinal)
- }).toMap
-
+ private def transpose(colStatsRecords:
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]):
(HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
- val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
- val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
- val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
- val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
- // w/in the transposed dataset: since some files might not have all
of the columns indexed
- // either due to the Column Stats Index config changes, schema
evolution, etc, we have
- // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
- // values) for such file-column combinations
- val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
-
// NOTE: We're sorting the columns to make sure final index schema matches
layout
// of the transposed table
- val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns):
_*)
-
- val transposedRDD = colStatsDF.rdd
- .filter(row =>
sortedTargetColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+ val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+ // NOTE: This is a trick to avoid pulling all of
[[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = this.indexedColumns
+
+ // Here we perform complex transformation which requires us to modify the
layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid
incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema
at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ // NOTE: Explicit conversion is required for Scala 2.11
+ .filter(JFunction.toJavaSerializableFunction(r =>
sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max
have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
- row
+ collection.Pair.of(r.getFileName, r)
} else {
- val minValueStruct = row.getAs[Row](minValueOrdinal)
- val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
- checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
+ checkState(minValueWrapper != null && maxValueWrapper != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
- val colName = row.getString(colNameOrdinal)
+ val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
- val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
- val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
- val rowValsSeq = row.toSeq.toArray
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper),
colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper),
colType)
+
// Update min-/max-value structs w/ unwrapped values in-place
- rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
- rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
- Row(rowValsSeq: _*)
+ collection.Pair.of(r.getFileName, r)
}
- }
- .groupBy(r => r.getString(fileNameOrdinal))
- .foldByKey(Seq[Row]()) {
- case (_, columnRowsSeq) =>
- // Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRowsSeq.head.get(fileNameOrdinal)
- val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
- // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
- // to align existing column-stats for individual file with the list
of expected ones for the
- // whole transposed projection (a superset of all files)
- val columnRowsMap = columnRowsSeq.map(row =>
(row.getString(colNameOrdinal), row)).toMap
- val alignedColumnRowsSeq =
sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
- val coalescedRowValuesSeq =
- alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
- case (acc, opt) =>
- opt match {
- case Some(columnStatsRow) =>
- acc ++ Seq(minValueOrdinal, maxValueOrdinal,
nullCountOrdinal).map(ord => columnStatsRow.get(ord))
- case None =>
- // NOTE: Since we're assuming missing column to
essentially contain exclusively
- // null values, we set null-count to be equal to
value-count (this behavior is
- // consistent with reading non-existent columns from
Parquet)
- acc ++ Seq(null, null, valueCount)
- }
- }
-
- Seq(Row(coalescedRowValuesSeq:_*))
- }
- .values
- .flatMap(it => it)
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] =
p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
+ // to align existing column-stats for individual file with the list of
expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName,
r)).toMap
+ val alignedColStatRecordsSeq =
sortedTargetColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName,
valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. Column is not indexed in Column Stats Index: in
this case we won't be returning
+ // any statistics for such column (ie all stats will
be null)
+ // 2. Particular file does not have this particular
column (which is indexed by Column Stats Index):
+ // in this case we're assuming missing column to
essentially contain exclusively
+ // null values, we set min/max values as null and
null-count to be equal to value-count (this
+ // behavior is consistent with reading non-existent
columns from Parquet)
+ //
+ // This is a way to determine current column's index without
explicit iteration (we're adding 3 stats / column)
+ val idx = acc.length / 3
+ val colName = sortedTargetColumns(idx)
+ val indexed = indexedColumns.contains(colName)
+
+ val nullCount = if (indexed) valueCount else null
+
+ acc ++= Seq(null, null, nullCount)
+ }
+ }
+
+ Row(coalescedRowValuesSeq:_*)
+ }))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the
schema
- val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq,
tableSchema)
+ val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+ (transposedRows, indexSchema)
+ }
- spark.createDataFrame(transposedRDD, indexSchema)
+ private def loadColumnStatsIndexForColumnsInternal(targetColumns:
Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+ val colStatsDF = {
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ // NOTE: Explicit conversion is required for Scala 2.11
+ val catalystRows: HoodieData[InternalRow] =
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+ val converter =
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType)
+ it.asScala.map(r => converter(r).orNull).asJava
+ }), false)
+
+ if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution of
the subsequently chained operations
+ // on it locally (on the driver; all such operations are
actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromInternalRows(spark,
catalystRows.collectAsList().asScala, columnStatsRecordStructType)
Review Comment:
Yes. At first, I thought the in-memory implementation is independent of
Spark, but it is leveraging Spark's in-memory processing. I think the mode
naming is still fine, just that `in-memory` mode can only be applied to Spark
now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]