This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 826385f9c49ff55cc364c974e39a3a7ede7a04e3 Author: RexAn <[email protected]> AuthorDate: Wed Dec 28 14:54:20 2022 +0800 [HUDI-5482] Nulls should be counted in the value count stats for mor table (#7482) * The behavior is kept in line with COW parquet file stats. --- .../hudi/metadata/HoodieTableMetadataUtil.java | 4 +- .../hudi/functional/TestColumnStatsIndex.scala | 50 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 374d6fb46e7..1d7481df76b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -159,6 +159,8 @@ public class HoodieTableMetadataUtil { final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false); final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name()); + colStats.valueCount++; + if (fieldVal != null && canCompare(fieldSchema)) { // Set the min value of the field if (colStats.minValue == null @@ -170,8 +172,6 @@ public class HoodieTableMetadataUtil { if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) { colStats.maxValue = fieldVal; } - - colStats.valueCount++; } else { colStats.nullCount++; } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 3bf8d352808..1172b657f64 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -38,7 +38,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue import org.junit.jupiter.api._ import org.junit.jupiter.api.condition.DisabledIf import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} +import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource} import java.math.BigInteger import java.sql.{Date, Timestamp} @@ -129,6 +129,54 @@ class TestColumnStatsIndex extends HoodieClientTestBase { saveMode = SaveMode.Append) } + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + val schema = StructType(StructField("c1", IntegerType, false) :: StructField("c2", StringType, true) :: Nil) + val inputDF = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, "v1"), Row(2, "v2"), Row(3, null), Row(4, "v4"))), + schema) + + inputDF + .sort("c1", "c2") + .write + .format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) + columnStatsIndex.loadTransposed(Seq("c2"), false) { transposedDF => + val result = transposedDF.select("valueCount", "c2_nullCount") + .collect().head + + assertTrue(result.getLong(0) == 4) + assertTrue(result.getLong(1) == 1) + } + } + @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
