yihua commented on code in PR #5746:
URL: https://github.com/apache/hudi/pull/5746#discussion_r929200613
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column
stats index will be built. If not set, all columns will be indexed");
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY =
"in-memory";
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE =
"engine";
+
+ public static final ConfigProperty<String>
COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
+ .noDefaultValue()
+ .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY,
COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
+ .sinceVersion("0.12.0")
+ .withDocumentation("By default Column Stats Index is automatically
determining whether it should be read and processed either"
+ + "'in-memory' (w/in executing process) or using Spark (on a
cluster), based on some factors like the size of the Index "
Review Comment:
Don't forget to update the docs as well
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends
SparkAdapterSupport {
* column references from the filtering expressions, and only
transpose records corresponding to the
* columns referenced in those
*
- * @param spark Spark session ref
- * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing
raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
- val colStatsSchema = colStatsDF.schema
- val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
- case (field, ordinal) => (field.name, ordinal)
- }).toMap
-
+ private def transpose(colStatsRecords:
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]):
(HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
- val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
- val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
- val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
- val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
- // w/in the transposed dataset: since some files might not have all
of the columns indexed
- // either due to the Column Stats Index config changes, schema
evolution, etc, we have
- // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
- // values) for such file-column combinations
- val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
-
// NOTE: We're sorting the columns to make sure final index schema matches
layout
// of the transposed table
- val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns):
_*)
-
- val transposedRDD = colStatsDF.rdd
- .filter(row =>
sortedTargetColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+ val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+ // NOTE: This is a trick to avoid pulling all of
[[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = this.indexedColumns
+
+ // Here we perform complex transformation which requires us to modify the
layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid
incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema
at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ // NOTE: Explicit conversion is required for Scala 2.11
+ .filter(JFunction.toJavaSerializableFunction(r =>
sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max
have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
- row
+ collection.Pair.of(r.getFileName, r)
} else {
- val minValueStruct = row.getAs[Row](minValueOrdinal)
- val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
- checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
+ checkState(minValueWrapper != null && maxValueWrapper != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
- val colName = row.getString(colNameOrdinal)
+ val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
- val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
- val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
- val rowValsSeq = row.toSeq.toArray
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper),
colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper),
colType)
+
// Update min-/max-value structs w/ unwrapped values in-place
- rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
- rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
- Row(rowValsSeq: _*)
+ collection.Pair.of(r.getFileName, r)
}
- }
- .groupBy(r => r.getString(fileNameOrdinal))
- .foldByKey(Seq[Row]()) {
- case (_, columnRowsSeq) =>
- // Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRowsSeq.head.get(fileNameOrdinal)
- val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
- // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
- // to align existing column-stats for individual file with the list
of expected ones for the
- // whole transposed projection (a superset of all files)
- val columnRowsMap = columnRowsSeq.map(row =>
(row.getString(colNameOrdinal), row)).toMap
- val alignedColumnRowsSeq =
sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
- val coalescedRowValuesSeq =
- alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
- case (acc, opt) =>
- opt match {
- case Some(columnStatsRow) =>
- acc ++ Seq(minValueOrdinal, maxValueOrdinal,
nullCountOrdinal).map(ord => columnStatsRow.get(ord))
- case None =>
- // NOTE: Since we're assuming missing column to
essentially contain exclusively
- // null values, we set null-count to be equal to
value-count (this behavior is
- // consistent with reading non-existent columns from
Parquet)
- acc ++ Seq(null, null, valueCount)
- }
- }
-
- Seq(Row(coalescedRowValuesSeq:_*))
- }
- .values
- .flatMap(it => it)
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] =
p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
+ // to align existing column-stats for individual file with the list of
expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName,
r)).toMap
+ val alignedColStatRecordsSeq =
sortedTargetColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName,
valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. Column is not indexed in Column Stats Index: in
this case we won't be returning
+ // any statistics for such column (ie all stats will
be null)
+ // 2. Particular file does not have this particular
column (which is indexed by Column Stats Index):
+ // in this case we're assuming missing column to
essentially contain exclusively
+ // null values, we set min/max values as null and
null-count to be equal to value-count (this
+ // behavior is consistent with reading non-existent
columns from Parquet)
+ //
+ // This is a way to determine current column's index without
explicit iteration (we're adding 3 stats / column)
+ val idx = acc.length / 3
+ val colName = sortedTargetColumns(idx)
+ val indexed = indexedColumns.contains(colName)
+
+ val nullCount = if (indexed) valueCount else null
+
+ acc ++= Seq(null, null, nullCount)
+ }
+ }
+
+ Row(coalescedRowValuesSeq:_*)
+ }))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the
schema
- val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq,
tableSchema)
+ val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+ (transposedRows, indexSchema)
+ }
- spark.createDataFrame(transposedRDD, indexSchema)
+ private def loadColumnStatsIndexForColumnsInternal(targetColumns:
Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+ val colStatsDF = {
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ // NOTE: Explicit conversion is required for Scala 2.11
+ val catalystRows: HoodieData[InternalRow] =
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
Review Comment:
This works for scala 2.12 as well?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.data.HoodieData
+import org.apache.spark.sql.Dataset
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
+
+object HoodieCatalystUtils extends SparkAdapterSupport {
+
+ /**
+ * Executes provided function while keeping provided [[Dataset]] instance
persisted for the
+ * duration of the execution
+ *
+ * @param df target [[Dataset]] to be persisted
+ * @param level desired [[StorageLevel]] of the persistence
+ * @param f target function to be executed while [[Dataset]] is kept
persisted
+ * @tparam T return value of the target function
+ * @return execution outcome of the [[f]] function
+ */
+ def withPersistedDataset[T](df: Dataset[_], level: StorageLevel =
MEMORY_AND_DISK)(f: => T): T = {
+ df.persist(level)
+ try {
+ f
+ } finally {
+ df.unpersist()
+ }
Review Comment:
I remember you put similar util methods in before. Are these the same?
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column
stats index will be built. If not set, all columns will be indexed");
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY =
"in-memory";
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE =
"engine";
+
+ public static final ConfigProperty<String>
COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
+ .noDefaultValue()
+ .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY,
COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
+ .sinceVersion("0.12.0")
+ .withDocumentation("By default Column Stats Index is automatically
determining whether it should be read and processed either"
+ + "'in-memory' (w/in executing process) or using Spark (on a
cluster), based on some factors like the size of the Index "
+ + "and how many columns are read. This config allows to override
this behavior.");
+
+ public static final ConfigProperty<Integer>
COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty
+ .key(METADATA_PREFIX +
".index.column.stats.inMemory.projection.threshold")
+ .defaultValue(100000)
Review Comment:
Any perf numbers to support this default threshold?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends
SparkAdapterSupport {
* column references from the filtering expressions, and only
transpose records corresponding to the
* columns referenced in those
*
- * @param spark Spark session ref
- * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing
raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame,
queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
- val colStatsSchema = colStatsDF.schema
- val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
- case (field, ordinal) => (field.name, ordinal)
- }).toMap
-
+ private def transpose(colStatsRecords:
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]):
(HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
- val colNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- val minValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
- val maxValueOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
- val fileNameOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- val nullCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
- val valueCountOrdinal =
colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- // NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
- // w/in the transposed dataset: since some files might not have all
of the columns indexed
- // either due to the Column Stats Index config changes, schema
evolution, etc, we have
- // to make sure that all of the rows w/in transposed data-frame are
properly padded (with null
- // values) for such file-column combinations
- val indexedColumns: Seq[String] = colStatsDF.rdd.map(row =>
row.getString(colNameOrdinal)).distinct().collect()
-
// NOTE: We're sorting the columns to make sure final index schema matches
layout
// of the transposed table
- val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns):
_*)
-
- val transposedRDD = colStatsDF.rdd
- .filter(row =>
sortedTargetColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+ val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+ // NOTE: This is a trick to avoid pulling all of
[[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = this.indexedColumns
+
+ // Here we perform complex transformation which requires us to modify the
layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid
incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema
at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ // NOTE: Explicit conversion is required for Scala 2.11
+ .filter(JFunction.toJavaSerializableFunction(r =>
sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max
have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
- row
+ collection.Pair.of(r.getFileName, r)
} else {
- val minValueStruct = row.getAs[Row](minValueOrdinal)
- val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
- checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
+ checkState(minValueWrapper != null && maxValueWrapper != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
- val colName = row.getString(colNameOrdinal)
+ val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
- val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
- val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
- val rowValsSeq = row.toSeq.toArray
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper),
colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper),
colType)
+
// Update min-/max-value structs w/ unwrapped values in-place
- rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
- rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
- Row(rowValsSeq: _*)
+ collection.Pair.of(r.getFileName, r)
}
- }
- .groupBy(r => r.getString(fileNameOrdinal))
- .foldByKey(Seq[Row]()) {
- case (_, columnRowsSeq) =>
- // Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRowsSeq.head.get(fileNameOrdinal)
- val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
- // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
- // to align existing column-stats for individual file with the list
of expected ones for the
- // whole transposed projection (a superset of all files)
- val columnRowsMap = columnRowsSeq.map(row =>
(row.getString(colNameOrdinal), row)).toMap
- val alignedColumnRowsSeq =
sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
- val coalescedRowValuesSeq =
- alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
- case (acc, opt) =>
- opt match {
- case Some(columnStatsRow) =>
- acc ++ Seq(minValueOrdinal, maxValueOrdinal,
nullCountOrdinal).map(ord => columnStatsRow.get(ord))
- case None =>
- // NOTE: Since we're assuming missing column to
essentially contain exclusively
- // null values, we set null-count to be equal to
value-count (this behavior is
- // consistent with reading non-existent columns from
Parquet)
- acc ++ Seq(null, null, valueCount)
- }
- }
-
- Seq(Row(coalescedRowValuesSeq:_*))
- }
- .values
- .flatMap(it => it)
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] =
p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
+ // to align existing column-stats for individual file with the list of
expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName,
r)).toMap
+ val alignedColStatRecordsSeq =
sortedTargetColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName,
valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. Column is not indexed in Column Stats Index: in
this case we won't be returning
+ // any statistics for such column (ie all stats will
be null)
+ // 2. Particular file does not have this particular
column (which is indexed by Column Stats Index):
+ // in this case we're assuming missing column to
essentially contain exclusively
+ // null values, we set min/max values as null and
null-count to be equal to value-count (this
+ // behavior is consistent with reading non-existent
columns from Parquet)
+ //
+ // This is a way to determine current column's index without
explicit iteration (we're adding 3 stats / column)
+ val idx = acc.length / 3
+ val colName = sortedTargetColumns(idx)
+ val indexed = indexedColumns.contains(colName)
+
+ val nullCount = if (indexed) valueCount else null
+
+ acc ++= Seq(null, null, nullCount)
+ }
+ }
+
+ Row(coalescedRowValuesSeq:_*)
+ }))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the
schema
- val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq,
tableSchema)
+ val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+ (transposedRows, indexSchema)
+ }
- spark.createDataFrame(transposedRDD, indexSchema)
+ private def loadColumnStatsIndexForColumnsInternal(targetColumns:
Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+ val colStatsDF = {
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ // NOTE: Explicit conversion is required for Scala 2.11
+ val catalystRows: HoodieData[InternalRow] =
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+ val converter =
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType)
+ it.asScala.map(r => converter(r).orNull).asJava
+ }), false)
+
+ if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution of
the subsequently chained operations
+ // on it locally (on the driver; all such operations are
actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromInternalRows(spark,
catalystRows.collectAsList().asScala, columnStatsRecordStructType)
Review Comment:
Basically, this is still spark-specific. Later on, we need to think about
how to generalize this to non-Spark environment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]