This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c798842003c [HUDI-8185] Fix SPARK record for Colstats (#11969)
c798842003c is described below
commit c798842003cf79d128835f27d897e6647f7e6ba7
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Sep 26 19:57:05 2024 +0530
[HUDI-8185] Fix SPARK record for Colstats (#11969)
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 ++++--
.../apache/hudi/functional/TestMORDataSource.scala | 29 ++++++++++++++++++----
2 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 8ec48326552..e85aebc9da1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -233,7 +233,7 @@ public class HoodieTableMetadataUtil {
}
colStats.valueCount++;
- if (fieldValue != null && canCompare(fieldSchema)) {
+ if (fieldValue != null && canCompare(fieldSchema,
record.getRecordType())) {
// Set the min value of the field
if (colStats.minValue == null
|| ConvertingGenericData.INSTANCE.compare(fieldValue,
colStats.minValue, fieldSchema) < 0) {
@@ -1354,7 +1354,11 @@ public class HoodieTableMetadataUtil {
}
}
- private static boolean canCompare(Schema schema) {
+ private static boolean canCompare(Schema schema, HoodieRecordType
recordType) {
+ // if recordType is SPARK then we cannot compare RECORD and ARRAY types in
addition to MAP type
+ if (recordType == HoodieRecordType.SPARK) {
+ return schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+ }
return schema.getType() != Schema.Type.MAP;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index a78b216cc38..f9da68efdb5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -35,7 +35,7 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils,
HoodieSparkClientTestBase}
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils,
DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers,
SparkDatasetMixin}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions,
DataSourceUtils, DataSourceWriteOptions, DefaultSparkRecordMerger,
HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.hadoop.fs.Path
import org.apache.hudi.QuickstartUtils.convertToStringList
import org.apache.spark.sql._
@@ -46,7 +46,6 @@ import org.junit.jupiter.api.Assertions.{assertEquals,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
-import org.slf4j.LoggerFactory
import java.util.function.Consumer
import scala.collection.JavaConverters._
@@ -57,7 +56,6 @@ import scala.collection.JavaConverters._
class TestMORDataSource extends HoodieSparkClientTestBase with
SparkDatasetMixin {
var spark: SparkSession = null
- private val log = LoggerFactory.getLogger(classOf[TestMORDataSource])
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -886,12 +884,14 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testReadPathsForOnlyLogFiles(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+ // enable column stats
+ val hudiOpts = writeOpts ++
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF1 =
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala.toSeq,
2))
inputDF1.write.format("hudi")
- .options(writeOpts)
+ .options(hudiOpts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Use InMemoryIndex to generate log only mor table.
@@ -910,7 +910,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF2 =
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala.toSeq,
2))
inputDF2.write.format("hudi")
- .options(writeOpts)
+ .options(hudiOpts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Use InMemoryIndex to generate log only mor table.
@@ -928,6 +928,25 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.load()
assertEquals(expectedCount1, hudiReadPathDF.count())
+
+ if (recordType == HoodieRecordType.SPARK) {
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark,
inputDF1.schema, metadataConfig, metaClient)
+ columnStatsIndex.loadTransposed(Seq("fare", "city_to_state", "rider"),
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
+ // fare is a nested column, so it should not have any min/max value as
it is not comparable, but still have nullCount
+ assertEquals(0, emptyTransposedColStatsDF.filter("fare_minValue IS NOT
NULL").count())
+ assertEquals(0, emptyTransposedColStatsDF.filter("fare_maxValue IS NOT
NULL").count())
+ assertTrue(emptyTransposedColStatsDF.filter("fare_nullCount IS NOT
NULL").count() > 0)
+ // city_to_state is a map column, so it should not have any min/max
value as it is not comparable, but still have nullCount
+ assertEquals(0,
emptyTransposedColStatsDF.filter("city_to_state_minValue IS NOT NULL").count())
+ assertEquals(0,
emptyTransposedColStatsDF.filter("city_to_state_maxValue IS NOT NULL").count())
+ assertTrue(emptyTransposedColStatsDF.filter("city_to_state_nullCount
IS NOT NULL").count() > 0)
+ // rider is a simple string field, so it should have a min/max value
as well as nullCount
+ assertTrue(emptyTransposedColStatsDF.filter("rider_minValue IS NOT
NULL").count() > 0)
+ assertTrue(emptyTransposedColStatsDF.filter("rider_maxValue IS NOT
NULL").count() > 0)
+ assertTrue(emptyTransposedColStatsDF.filter("rider_nullCount IS NOT
NULL").count() > 0)
+ }
+ }
}
@ParameterizedTest