This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bd57282f248 [HUDI-5482] Nulls should be counted in the value count
stats for mor table (#7482)
bd57282f248 is described below
commit bd57282f248e4a886c4fdb989789b8550dae6d60
Author: RexAn <[email protected]>
AuthorDate: Wed Dec 28 14:54:20 2022 +0800
[HUDI-5482] Nulls should be counted in the value count stats for mor table
(#7482)
* The behavior is kept in line with COW parquet file stats.
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 4 +-
.../hudi/functional/TestColumnStatsIndex.scala | 50 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 9d6a41ea153..d37dbab3d82 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -160,6 +160,8 @@ public class HoodieTableMetadataUtil {
final Object fieldVal =
convertValueForSpecificDataTypes(field.schema(),
genericRecord.get(field.name()), false);
final Schema fieldSchema =
getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
+ colStats.valueCount++;
+
if (fieldVal != null && canCompare(fieldSchema)) {
// Set the min value of the field
if (colStats.minValue == null
@@ -171,8 +173,6 @@ public class HoodieTableMetadataUtil {
if (colStats.maxValue == null ||
ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue,
fieldSchema) > 0) {
colStats.maxValue = fieldVal;
}
-
- colStats.valueCount++;
} else {
colStats.nullCount++;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 731548693f3..0d3b09b4c17 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource,
ValueSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -128,6 +128,54 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
saveMode = SaveMode.Append)
}
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit
= {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ val schema = StructType(StructField("c1", IntegerType, false) ::
StructField("c2", StringType, true) :: Nil)
+ val inputDF = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "v1"), Row(2, "v2"), Row(3,
null), Row(4, "v4"))),
+ schema)
+
+ inputDF
+ .sort("c1", "c2")
+ .write
+ .format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema,
metadataConfig, metaClient)
+ columnStatsIndex.loadTransposed(Seq("c2"), false) { transposedDF =>
+ val result = transposedDF.select("valueCount", "c2_nullCount")
+ .collect().head
+
+ assertTrue(result.getLong(0) == 4)
+ assertTrue(result.getLong(1) == 1)
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory:
Boolean): Unit = {