codope commented on code in PR #9345:
URL: https://github.com/apache/hudi/pull/9345#discussion_r1285078625


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
+import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api._
+import org.junit.jupiter.params.provider.Arguments
+
+import java.math.BigInteger
+import java.sql.{Date, Timestamp}
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+@Tag("functional")
+class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
+  var spark: SparkSession = _
+  var dfList: Seq[DataFrame] = Seq()
+
+  val sourceTableSchema =
+    new StructType()
+      .add("c1", IntegerType)
+      .add("c2", StringType)
+      .add("c3", DecimalType(9, 3))
+      .add("c4", TimestampType)
+      .add("c5", ShortType)
+      .add("c6", DateType)
+      .add("c7", BinaryType)
+      .add("c8", ByteType)
+
+  @BeforeEach
+  override def setUp() {
+    initPath()
+    initSparkContexts()
+    initFileSystem()
+
+    setTableName("hoodie_test")
+    initMetaClient()
+
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach
+  override def tearDown() = {
+    cleanupFileSystem()
+    cleanupSparkContexts()
+  }
+
+  protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
+                                            metadataOpts: Map[String, String],
+                                            hudiOpts: Map[String, String],
+                                            dataSourcePath: String,
+                                            expectedColStatsSourcePath: String,
+                                            operation: String,
+                                            saveMode: SaveMode,
+                                            shouldValidate: Boolean = true): 
Unit = {
+    val sourceJSONTablePath = 
getClass.getClassLoader.getResource(dataSourcePath).toString
+
+    // NOTE: Schema here is provided for validation that the input date is in 
the appropriate format
+    val inputDF = 
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+
+    inputDF
+      .sort("c1")
+      .repartition(4, new Column("c1"))
+      .write
+      .format("hudi")
+      .options(hudiOpts)
+      .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+      .option(DataSourceWriteOptions.OPERATION.key, operation)
+      .mode(saveMode)
+      .save(basePath)
+    dfList = dfList :+ inputDF
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+
+    if (shouldValidate) {
+      // Currently, routine manually validating the column stats (by actually 
reading every column of every file)
+      // only supports parquet files. Therefore we skip such validation when 
delta-log files are present, and only
+      // validate in following cases: (1) COW: all operations; (2) MOR: insert 
only.
+      val shouldValidateColumnStatsManually = testCase.tableType == 
HoodieTableType.COPY_ON_WRITE ||
+        operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+
+      validateColumnStatsIndex(
+        testCase, metadataOpts, expectedColStatsSourcePath, 
shouldValidateColumnStatsManually)
+    }
+  }
+
+  protected def buildColumnStatsTableManually(tablePath: String,
+                                            includedCols: Seq[String],
+                                            indexedCols: Seq[String],
+                                            indexSchema: StructType): 
DataFrame = {
+    val files = {
+      val it = fs.listFiles(new Path(tablePath), true)
+      var seq = Seq[LocatedFileStatus]()
+      while (it.hasNext) {
+        seq = seq :+ it.next()
+      }
+      seq.filter(fs => fs.getPath.getName.endsWith(".parquet"))
+    }
+
+    spark.createDataFrame(
+      files.flatMap(file => {
+        val df = 
spark.read.schema(sourceTableSchema).parquet(file.getPath.toString)
+        val exprs: Seq[String] =
+          s"'${typedLit(file.getPath.getName)}' AS file" +:
+            s"sum(1) AS valueCount" +:
+            df.columns
+              .filter(col => includedCols.contains(col))
+              .filter(col => indexedCols.contains(col))
+              .flatMap(col => {
+                val minColName = s"${col}_minValue"
+                val maxColName = s"${col}_maxValue"
+                if (indexedCols.contains(col)) {
+                  Seq(
+                    s"min($col) AS $minColName",
+                    s"max($col) AS $maxColName",
+                    s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
+                  )
+                } else {
+                  Seq(
+                    s"null AS $minColName",
+                    s"null AS $maxColName",
+                    s"null AS ${col}_nullCount"
+                  )
+                }
+              })
+
+        df.selectExpr(exprs: _*)
+          .collect()
+      }).asJava,
+      indexSchema
+    )
+  }
+
+  protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
+                                       metadataOpts: Map[String, String],
+                                       expectedColStatsSourcePath: String,
+                                       validateColumnStatsManually: Boolean): 
Unit = {
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(toProperties(metadataOpts))
+      .build()
+
+    val columnStatsIndex = new ColumnStatsIndexSupport(spark, 
sourceTableSchema, metadataConfig, metaClient)
+
+    val indexedColumns: Set[String] = {
+      val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
+      if (customIndexedColumns.isEmpty) {
+        sourceTableSchema.fieldNames.toSet
+      } else {
+        customIndexedColumns.asScala.toSet
+      }
+    }
+    val (expectedColStatsSchema, _) = 
composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, 
sourceTableSchema)
+    val validationSortColumns = Seq("c1_maxValue", "c1_minValue", 
"c2_maxValue", "c2_minValue")
+
+    columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, 
testCase.shouldReadInMemory) { transposedColStatsDF =>
+      // Match against expected column stats table
+      val expectedColStatsIndexTableDf =
+        spark.read
+          .schema(expectedColStatsSchema)
+          
.json(getClass.getClassLoader.getResource(expectedColStatsSourcePath).toString)
+
+      assertEquals(expectedColStatsIndexTableDf.schema, 
transposedColStatsDF.schema)
+      // NOTE: We have to drop the `fileName` column as it contains 
semi-random components
+      //       that we can't control in this test. Nevertheless, since we 
manually verify composition of the
+      //       ColStats Index by reading Parquet footers from individual 
Parquet files, this is not an issue
+      assertEquals(asJson(sort(expectedColStatsIndexTableDf, 
validationSortColumns)),
+        asJson(sort(transposedColStatsDF.drop("fileName"), 
validationSortColumns)))
+
+      if (validateColumnStatsManually) {
+        // TODO(HUDI-4557): support validation of column stats of avro log 
files

Review Comment:
   I see that you have already added a test for log files only using in-memory 
index. That's fine. You can keep the comment here (from JIRA ID, looks like an 
older ticket). We can follow it up in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to