yihua commented on code in PR #5746:
URL: https://github.com/apache/hudi/pull/5746#discussion_r929357347


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends 
SparkAdapterSupport {
    *       column references from the filtering expressions, and only 
transpose records corresponding to the
    *       columns referenced in those
    *
-   * @param spark Spark session ref
-   * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+   * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing 
raw Column Stats Index records
    * @param queryColumns target columns to be included into the final table
-   * @param tableSchema schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, 
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
-    val colStatsSchema = colStatsDF.schema
-    val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
-      case (field, ordinal) => (field.name, ordinal)
-    }).toMap
-
+  private def transpose(colStatsRecords: 
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): 
(HoodieData[Row], StructType) = {
     val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
-    val colNameOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
-    val minValueOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
-    val maxValueOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
-    val fileNameOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-    val nullCountOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-    val valueCountOrdinal = 
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
-    // NOTE: We have to collect list of indexed columns to make sure we 
properly align the rows
-    //       w/in the transposed dataset: since some files might not have all 
of the columns indexed
-    //       either due to the Column Stats Index config changes, schema 
evolution, etc, we have
-    //       to make sure that all of the rows w/in transposed data-frame are 
properly padded (with null
-    //       values) for such file-column combinations
-    val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => 
row.getString(colNameOrdinal)).distinct().collect()
-
     // NOTE: We're sorting the columns to make sure final index schema matches 
layout
     //       of the transposed table
-    val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): 
_*)
-
-    val transposedRDD = colStatsDF.rdd
-      .filter(row => 
sortedTargetColumns.contains(row.getString(colNameOrdinal)))
-      .map { row =>
-        if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+    val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+    val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+    // NOTE: This is a trick to avoid pulling all of 
[[ColumnStatsIndexSupport]] object into the lambdas'
+    //       closures below
+    val indexedColumns = this.indexedColumns
+
+    // Here we perform complex transformation which requires us to modify the 
layout of the rows
+    // of the dataset, and therefore we rely on low-level RDD API to avoid 
incurring encoding/decoding
+    // penalty of the [[Dataset]], since it's required to adhere to its schema 
at all times, while
+    // RDDs are not;
+    val transposedRows: HoodieData[Row] = colStatsRecords
+      // NOTE: Explicit conversion is required for Scala 2.11
+      .filter(JFunction.toJavaSerializableFunction(r => 
sortedTargetColumnsSet.contains(r.getColumnName)))
+      .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+        if (r.getMinValue == null && r.getMaxValue == null) {
           // Corresponding row could be null in either of the 2 cases
           //    - Column contains only null values (in that case both min/max 
have to be nulls)
           //    - This is a stubbed Column Stats record (used as a tombstone)
-          row
+          collection.Pair.of(r.getFileName, r)
         } else {
-          val minValueStruct = row.getAs[Row](minValueOrdinal)
-          val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+          val minValueWrapper = r.getMinValue
+          val maxValueWrapper = r.getMaxValue
 
-          checkState(minValueStruct != null && maxValueStruct != null, 
"Invalid Column Stats record: either both min/max have to be null, or both have 
to be non-null")
+          checkState(minValueWrapper != null && maxValueWrapper != null, 
"Invalid Column Stats record: either both min/max have to be null, or both have 
to be non-null")
 
-          val colName = row.getString(colNameOrdinal)
+          val colName = r.getColumnName
           val colType = tableSchemaFieldMap(colName).dataType
 
-          val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
-          val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
-          val rowValsSeq = row.toSeq.toArray
+          val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), 
colType)
+          val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), 
colType)
+
           // Update min-/max-value structs w/ unwrapped values in-place
-          rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
-          rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+          r.setMinValue(minValue)
+          r.setMaxValue(maxValue)
 
-          Row(rowValsSeq: _*)
+          collection.Pair.of(r.getFileName, r)
         }
-      }
-      .groupBy(r => r.getString(fileNameOrdinal))
-      .foldByKey(Seq[Row]()) {
-        case (_, columnRowsSeq) =>
-          // Rows seq is always non-empty (otherwise it won't be grouped into)
-          val fileName = columnRowsSeq.head.get(fileNameOrdinal)
-          val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
-          // To properly align individual rows (corresponding to a file) w/in 
the transposed projection, we need
-          // to align existing column-stats for individual file with the list 
of expected ones for the
-          // whole transposed projection (a superset of all files)
-          val columnRowsMap = columnRowsSeq.map(row => 
(row.getString(colNameOrdinal), row)).toMap
-          val alignedColumnRowsSeq = 
sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
-          val coalescedRowValuesSeq =
-            alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
-              case (acc, opt) =>
-                opt match {
-                  case Some(columnStatsRow) =>
-                    acc ++ Seq(minValueOrdinal, maxValueOrdinal, 
nullCountOrdinal).map(ord => columnStatsRow.get(ord))
-                  case None =>
-                    // NOTE: Since we're assuming missing column to 
essentially contain exclusively
-                    //       null values, we set null-count to be equal to 
value-count (this behavior is
-                    //       consistent with reading non-existent columns from 
Parquet)
-                    acc ++ Seq(null, null, valueCount)
-                }
-            }
-
-          Seq(Row(coalescedRowValuesSeq:_*))
-      }
-      .values
-      .flatMap(it => it)
+      }))
+      .groupByKey()
+      .map(JFunction.toJavaSerializableFunction(p => {
+        val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = 
p.getValue.asScala.toSeq
+        val fileName: String = p.getKey
+        val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+        // To properly align individual rows (corresponding to a file) w/in 
the transposed projection, we need
+        // to align existing column-stats for individual file with the list of 
expected ones for the
+        // whole transposed projection (a superset of all files)
+        val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, 
r)).toMap
+        val alignedColStatRecordsSeq = 
sortedTargetColumns.map(columnRecordsMap.get)
+
+        val coalescedRowValuesSeq =
+          alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, 
valueCount)) {
+            case (acc, opt) =>
+              opt match {
+                case Some(colStatRecord) =>
+                  acc ++= Seq(colStatRecord.getMinValue, 
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+                case None =>
+                  // NOTE: This could occur in either of the following cases:
+                  //    1. Column is not indexed in Column Stats Index: in 
this case we won't be returning
+                  //       any statistics for such column (ie all stats will 
be null)
+                  //    2. Particular file does not have this particular 
column (which is indexed by Column Stats Index):
+                  //       in this case we're assuming missing column to 
essentially contain exclusively
+                  //       null values, we set min/max values as null and 
null-count to be equal to value-count (this
+                  //       behavior is consistent with reading non-existent 
columns from Parquet)
+                  //
+                  // This is a way to determine current column's index without 
explicit iteration (we're adding 3 stats / column)
+                  val idx = acc.length / 3
+                  val colName = sortedTargetColumns(idx)
+                  val indexed = indexedColumns.contains(colName)
+
+                  val nullCount = if (indexed) valueCount else null
+
+                  acc ++= Seq(null, null, nullCount)
+              }
+          }
+
+        Row(coalescedRowValuesSeq:_*)
+      }))
 
     // NOTE: It's crucial to maintain appropriate ordering of the columns
     //       matching table layout: hence, we cherry-pick individual columns
     //       instead of simply filtering in the ones we're interested in the 
schema
-    val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, 
tableSchema)
+    val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+    (transposedRows, indexSchema)
+  }
 
-    spark.createDataFrame(transposedRDD, indexSchema)
+  private def loadColumnStatsIndexForColumnsInternal(targetColumns: 
Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+    val colStatsDF = {
+      val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+      // NOTE: Explicit conversion is required for Scala 2.11
+      val catalystRows: HoodieData[InternalRow] = 
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+        val converter = 
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
 columnStatsRecordStructType)
+        it.asScala.map(r => converter(r).orNull).asJava
+      }), false)
+
+      if (shouldReadInMemory) {
+        // NOTE: This will instantiate a [[Dataset]] backed by 
[[LocalRelation]] holding all of the rows
+        //       of the transposed table in memory, facilitating execution of 
the subsequently chained operations
+        //       on it locally (on the driver; all such operations are 
actually going to be performed by Spark's
+        //       Optimizer)
+        createDataFrameFromInternalRows(spark, 
catalystRows.collectAsList().asScala, columnStatsRecordStructType)

Review Comment:
   Yes.  At first, I thought the in-memory implementation is independent of 
Spark, but it is leveraging Spark's in-memory processing.  I think the mode 
naming is still fine, just that `in-memory` mode can only be applied to Spark 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to