This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c798842003c [HUDI-8185] Fix SPARK record for Colstats (#11969)
c798842003c is described below

commit c798842003cf79d128835f27d897e6647f7e6ba7
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Sep 26 19:57:05 2024 +0530

    [HUDI-8185] Fix SPARK record for Colstats (#11969)
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  8 ++++--
 .../apache/hudi/functional/TestMORDataSource.scala | 29 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 8ec48326552..e85aebc9da1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -233,7 +233,7 @@ public class HoodieTableMetadataUtil {
         }
 
         colStats.valueCount++;
-        if (fieldValue != null && canCompare(fieldSchema)) {
+        if (fieldValue != null && canCompare(fieldSchema, 
record.getRecordType())) {
           // Set the min value of the field
           if (colStats.minValue == null
               || ConvertingGenericData.INSTANCE.compare(fieldValue, 
colStats.minValue, fieldSchema) < 0) {
@@ -1354,7 +1354,11 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  private static boolean canCompare(Schema schema) {
+  private static boolean canCompare(Schema schema, HoodieRecordType 
recordType) {
+    // if recordType is SPARK then we cannot compare RECORD and ARRAY types in 
addition to MAP type
+    if (recordType == HoodieRecordType.SPARK) {
+      return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+    }
     return schema.getType() != Schema.Type.MAP;
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index a78b216cc38..f9da68efdb5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -35,7 +35,7 @@ import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
HoodieSparkClientTestBase}
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, 
DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, 
SparkDatasetMixin}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, 
HoodieDataSourceHelpers, SparkDatasetMixin}
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.QuickstartUtils.convertToStringList
 import org.apache.spark.sql._
@@ -46,7 +46,6 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
-import org.slf4j.LoggerFactory
 
 import java.util.function.Consumer
 import scala.collection.JavaConverters._
@@ -57,7 +56,6 @@ import scala.collection.JavaConverters._
 class TestMORDataSource extends HoodieSparkClientTestBase with 
SparkDatasetMixin {
 
   var spark: SparkSession = null
-  private val log = LoggerFactory.getLogger(classOf[TestMORDataSource])
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -886,12 +884,14 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
   def testReadPathsForOnlyLogFiles(recordType: HoodieRecordType): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+    // enable column stats
+    val hudiOpts = writeOpts ++ 
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")
 
     initMetaClient(HoodieTableType.MERGE_ON_READ)
     val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
     val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala.toSeq,
 2))
     inputDF1.write.format("hudi")
-      .options(writeOpts)
+      .options(hudiOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       // Use InMemoryIndex to generate log only mor table.
@@ -910,7 +910,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20)
     val inputDF2 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq,
 2))
     inputDF2.write.format("hudi")
-      .options(writeOpts)
+      .options(hudiOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       // Use InMemoryIndex to generate log only mor table.
@@ -928,6 +928,25 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .load()
 
     assertEquals(expectedCount1, hudiReadPathDF.count())
+
+    if (recordType == HoodieRecordType.SPARK) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()
+      val columnStatsIndex = new ColumnStatsIndexSupport(spark, 
inputDF1.schema, metadataConfig, metaClient)
+      columnStatsIndex.loadTransposed(Seq("fare", "city_to_state", "rider"), 
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
+        // fare is a nested column, so it should not have any min/max value as 
it is not comparable, but still have nullCount
+        assertEquals(0, emptyTransposedColStatsDF.filter("fare_minValue IS NOT 
NULL").count())
+        assertEquals(0, emptyTransposedColStatsDF.filter("fare_maxValue IS NOT 
NULL").count())
+        assertTrue(emptyTransposedColStatsDF.filter("fare_nullCount IS NOT 
NULL").count() > 0)
+        // city_to_state is a map column, so it should not have any min/max 
value as it is not comparable, but still have nullCount
+        assertEquals(0, 
emptyTransposedColStatsDF.filter("city_to_state_minValue IS NOT NULL").count())
+        assertEquals(0, 
emptyTransposedColStatsDF.filter("city_to_state_maxValue IS NOT NULL").count())
+        assertTrue(emptyTransposedColStatsDF.filter("city_to_state_nullCount 
IS NOT NULL").count() > 0)
+        // rider is a simple string field, so it should have a min/max value 
as well as nullCount
+        assertTrue(emptyTransposedColStatsDF.filter("rider_minValue IS NOT 
NULL").count() > 0)
+        assertTrue(emptyTransposedColStatsDF.filter("rider_maxValue IS NOT 
NULL").count() > 0)
+        assertTrue(emptyTransposedColStatsDF.filter("rider_nullCount IS NOT 
NULL").count() > 0)
+      }
+    }
   }
 
   @ParameterizedTest

Reply via email to