codope commented on code in PR #9345: URL: https://github.com/apache/hudi/pull/9345#discussion_r1285073359
########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.{LocatedFileStatus, Path} +import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema +import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.typedLit +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api._ +import org.junit.jupiter.params.provider.Arguments + +import java.math.BigInteger +import java.sql.{Date, Timestamp} +import scala.collection.JavaConverters._ +import scala.util.Random + +@Tag("functional") +class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { + var spark: SparkSession = _ + var dfList: Seq[DataFrame] = Seq() + + val sourceTableSchema = + new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9, 3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + initFileSystem() + + setTableName("hoodie_test") + initMetaClient() + + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown() = { + cleanupFileSystem() + cleanupSparkContexts() + } + + protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + hudiOpts: Map[String, String], + dataSourcePath: String, + expectedColStatsSourcePath: String, + operation: String, + saveMode: SaveMode, + shouldValidate: Boolean = true): Unit = { + val sourceJSONTablePath = getClass.getClassLoader.getResource(dataSourcePath).toString + + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) + + inputDF + .sort("c1") + .repartition(4, new Column("c1")) + .write + .format("hudi") + .options(hudiOpts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, operation) + .mode(saveMode) + .save(basePath) + dfList = dfList :+ inputDF + + metaClient = HoodieTableMetaClient.reload(metaClient) + + if (shouldValidate) { + // Currently, routine manually validating the column stats (by actually reading every column of every file) + // only supports parquet files. Therefore we skip such validation when delta-log files are present, and only + // validate in following cases: (1) COW: all operations; (2) MOR: insert only. + val shouldValidateColumnStatsManually = testCase.tableType == HoodieTableType.COPY_ON_WRITE || + operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + + validateColumnStatsIndex( + testCase, metadataOpts, expectedColStatsSourcePath, shouldValidateColumnStatsManually) + } + } + + protected def buildColumnStatsTableManually(tablePath: String, + includedCols: Seq[String], + indexedCols: Seq[String], + indexSchema: StructType): DataFrame = { + val files = { + val it = fs.listFiles(new Path(tablePath), true) + var seq = Seq[LocatedFileStatus]() + while (it.hasNext) { + seq = seq :+ it.next() + } + seq.filter(fs => fs.getPath.getName.endsWith(".parquet")) + } + + spark.createDataFrame( + files.flatMap(file => { + val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString) + val exprs: Seq[String] = + s"'${typedLit(file.getPath.getName)}' AS file" +: + s"sum(1) AS valueCount" +: + df.columns + .filter(col => includedCols.contains(col)) + .filter(col => indexedCols.contains(col)) + .flatMap(col => { + val minColName = s"${col}_minValue" + val maxColName = s"${col}_maxValue" + if (indexedCols.contains(col)) { + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount" + ) + } else { + Seq( + s"null AS $minColName", + s"null AS $maxColName", + s"null AS ${col}_nullCount" + ) + } + }) + + df.selectExpr(exprs: _*) + .collect() + }).asJava, + indexSchema + ) + } + + protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + expectedColStatsSourcePath: String, + validateColumnStatsManually: Boolean): Unit = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) + + val indexedColumns: Set[String] = { + val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex + if (customIndexedColumns.isEmpty) { + sourceTableSchema.fieldNames.toSet + } else { + customIndexedColumns.asScala.toSet + } + } + val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema) + val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue") + + columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF => + // Match against expected column stats table + val expectedColStatsIndexTableDf = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource(expectedColStatsSourcePath).toString) + + assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)), + asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns))) + + if (validateColumnStatsManually) { + // TODO(HUDI-4557): support validation of column stats of avro log files Review Comment: Why is this still a TODO? This PR should validate column stats for log files as well right? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala: ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.{LocatedFileStatus, Path} +import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema +import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.typedLit +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api._ +import org.junit.jupiter.params.provider.Arguments + +import java.math.BigInteger +import java.sql.{Date, Timestamp} +import scala.collection.JavaConverters._ +import scala.util.Random + +@Tag("functional") +class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { + var spark: SparkSession = _ + var dfList: Seq[DataFrame] = Seq() + + val sourceTableSchema = + new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9, 3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + initFileSystem() + + setTableName("hoodie_test") + initMetaClient() + + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown() = { + cleanupFileSystem() + cleanupSparkContexts() + } + + protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + hudiOpts: Map[String, String], + dataSourcePath: String, + expectedColStatsSourcePath: String, + operation: String, + saveMode: SaveMode, + shouldValidate: Boolean = true): Unit = { + val sourceJSONTablePath = getClass.getClassLoader.getResource(dataSourcePath).toString + + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) + + inputDF + .sort("c1") + .repartition(4, new Column("c1")) + .write + .format("hudi") + .options(hudiOpts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, operation) + .mode(saveMode) + .save(basePath) + dfList = dfList :+ inputDF + + metaClient = HoodieTableMetaClient.reload(metaClient) + + if (shouldValidate) { + // Currently, routine manually validating the column stats (by actually reading every column of every file) + // only supports parquet files. Therefore we skip such validation when delta-log files are present, and only + // validate in following cases: (1) COW: all operations; (2) MOR: insert only. + val shouldValidateColumnStatsManually = testCase.tableType == HoodieTableType.COPY_ON_WRITE || + operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + + validateColumnStatsIndex( + testCase, metadataOpts, expectedColStatsSourcePath, shouldValidateColumnStatsManually) + } + } + + protected def buildColumnStatsTableManually(tablePath: String, + includedCols: Seq[String], + indexedCols: Seq[String], + indexSchema: StructType): DataFrame = { + val files = { + val it = fs.listFiles(new Path(tablePath), true) + var seq = Seq[LocatedFileStatus]() + while (it.hasNext) { + seq = seq :+ it.next() + } + seq.filter(fs => fs.getPath.getName.endsWith(".parquet")) + } + + spark.createDataFrame( + files.flatMap(file => { + val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString) + val exprs: Seq[String] = + s"'${typedLit(file.getPath.getName)}' AS file" +: + s"sum(1) AS valueCount" +: + df.columns + .filter(col => includedCols.contains(col)) + .filter(col => indexedCols.contains(col)) + .flatMap(col => { + val minColName = s"${col}_minValue" + val maxColName = s"${col}_maxValue" + if (indexedCols.contains(col)) { + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount" + ) + } else { + Seq( + s"null AS $minColName", + s"null AS $maxColName", + s"null AS ${col}_nullCount" + ) + } + }) + + df.selectExpr(exprs: _*) + .collect() + }).asJava, + indexSchema + ) + } + + protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + expectedColStatsSourcePath: String, + validateColumnStatsManually: Boolean): Unit = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) + + val indexedColumns: Set[String] = { + val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex + if (customIndexedColumns.isEmpty) { + sourceTableSchema.fieldNames.toSet + } else { + customIndexedColumns.asScala.toSet + } + } + val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema) + val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue") + + columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF => + // Match against expected column stats table + val expectedColStatsIndexTableDf = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource(expectedColStatsSourcePath).toString) + + assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)), + asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns))) + + if (validateColumnStatsManually) { + // TODO(HUDI-4557): support validation of column stats of avro log files Review Comment: Perhaps you can generate a column stats test fixture for log files only. Per my undersanding, only filename will change right? Another option, if we can't validate here, is to do and e2e test to validate col stats for MOR with: a) only base file, b) base and log files, and c) only log files. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @Disabled("Needs more work") + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexAsyncCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), getWriteConfig(commonOpts)) + writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty()) + val compactionService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jsc), writeClient) + compactionService.enqueuePendingAsyncServiceInstant(metaClient.reloadActiveTimeline().lastInstant().get()) + compactionService.start(JFunction.toJavaFunction(b => true)) + compactionService.waitTillPendingAsyncServiceInstantsReducesTo(0) + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): Unit = { + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/another-input-table-json", + expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated2-column-stats-index-table.json" + } else { + "index/colstats/mor-updated2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate) + } + + def verifyFileIndexAndSQLQueries(opts: Map[String, String], isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean = true): Unit = { + var commonOpts = opts + val inputDF1 = spark.read.format("hudi") + .options(commonOpts) + .option("as.of.instant", metaClient.getActiveTimeline.getInstants.get(1).getTimestamp) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "false") + .load(basePath) + inputDF1.createOrReplaceTempView("tbl") + val numRecordsWithC5ColumnGreaterThan70 = spark.sql("select * from tbl where c5 > 70").count() + // verify snapshot query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify read_optimized query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify incremental query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true") + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + if (verifyFileCount) { + // First commit creates 4 parquet files + // Second commit creates 4 parquet files + // Last commit creates one parquet file and 4 log files - total 5 files (for MOR) + // and 4 parquet files and no log file (for COW) + // therefore on deletion all those 5 files create a new log file whereas COW file count remains same + var numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 17 else if (hasLogFiles()) 12 else 8 + var dataFilter = GreaterThan(attribute("c5"), literal("70")) + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + + dataFilter = GreaterThan(attribute("c5"), literal("90")) + numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 11 else if (hasLogFiles()) 7 else 4 + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + } + } + + private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int): Unit = { + val fileIndex = HoodieFileIndex(spark, metaClient, None, opts + ("path" -> basePath)) Review Comment: See if you can validate without using file index e.g. using physical plan. Instanting file index again after running query would add to the total test time. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @Disabled("Needs more work") + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexAsyncCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" Review Comment: This is inline config, shouldn't the corresponding async config be used? Also, I don't see async compaction config being enabled. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala: ########## @@ -108,7 +110,9 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, }.toSeq } - buildSplits(filterFileSlices(fileSlices, globPattern)) + var filteredFileSlices = filterFileSlices(fileSlices, globPattern) Review Comment: But why is the test failing after revert? Does it fail even after setting `fileIndex.setIncludeLogFiles(false)`? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) Review Comment: Why is `verifyFileCount` false here? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @Disabled("Needs more work") + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexAsyncCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), getWriteConfig(commonOpts)) + writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty()) + val compactionService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jsc), writeClient) + compactionService.enqueuePendingAsyncServiceInstant(metaClient.reloadActiveTimeline().lastInstant().get()) + compactionService.start(JFunction.toJavaFunction(b => true)) + compactionService.waitTillPendingAsyncServiceInstantsReducesTo(0) + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): Unit = { + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/another-input-table-json", + expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated2-column-stats-index-table.json" + } else { + "index/colstats/mor-updated2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate) + } + + def verifyFileIndexAndSQLQueries(opts: Map[String, String], isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean = true): Unit = { + var commonOpts = opts + val inputDF1 = spark.read.format("hudi") + .options(commonOpts) + .option("as.of.instant", metaClient.getActiveTimeline.getInstants.get(1).getTimestamp) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "false") + .load(basePath) + inputDF1.createOrReplaceTempView("tbl") + val numRecordsWithC5ColumnGreaterThan70 = spark.sql("select * from tbl where c5 > 70").count() + // verify snapshot query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify read_optimized query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify incremental query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true") + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + if (verifyFileCount) { + // First commit creates 4 parquet files + // Second commit creates 4 parquet files + // Last commit creates one parquet file and 4 log files - total 5 files (for MOR) + // and 4 parquet files and no log file (for COW) + // therefore on deletion all those 5 files create a new log file whereas COW file count remains same + var numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 17 else if (hasLogFiles()) 12 else 8 + var dataFilter = GreaterThan(attribute("c5"), literal("70")) + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + + dataFilter = GreaterThan(attribute("c5"), literal("90")) + numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 11 else if (hasLogFiles()) 7 else 4 + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + } + } + + private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int): Unit = { + val fileIndex = HoodieFileIndex(spark, metaClient, None, opts + ("path" -> basePath)) + fileIndex.setIncludeLogFiles(isTableMOR()) + val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter)) + val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size + assertTrue(filteredFilesCount < getLatestDataFilesCount(opts)) + assertEquals(filteredFilesCount, numFiles) + } + + private def getLatestDataFilesCount(opts: Map[String, String], includeLogFiles: Boolean = true) = { + var totalLatestDataFiles = 0L + getTableFileSystenView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp) + .values() + .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]] + (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice]( + slice => totalLatestDataFiles += (if (includeLogFiles) slice.getLogFiles.count() else 0) + + (if (slice.getBaseFile.isPresent) 1 else 0))))) + totalLatestDataFiles + } + + private def getTableFileSystenView(opts: Map[String, String]): HoodieMetadataFileSystemView = { + new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, metadataWriter(getWriteConfig(opts)).getTableMetadata) + } + + protected def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = { + val props = new Properties() + props.putAll(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava) + HoodieWriteConfig.newBuilder() + .withProps(props) + .withPath(basePath) + .build() + } + + private def attribute(partition: String): AttributeReference = { + AttributeReference(partition, StringType, true)() + } + + private def literal(value: String): Literal = { + Literal.create(value) + } + + private def verifySQLQueries(numRecordsWithC5ColumnGreaterThan70AtPrevInstant: Long, queryType: String, opts: Map[String, String], isLastOperationDelete: Boolean): Unit = { Review Comment: please rename the variable `numRecordsWithC5ColumnGreaterThan70AtPrevInstant`. It should not have exact column name and value. If needed, introduce two variables for column name and predicate literal. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @Disabled("Needs more work") + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexAsyncCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), getWriteConfig(commonOpts)) + writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty()) + val compactionService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jsc), writeClient) + compactionService.enqueuePendingAsyncServiceInstant(metaClient.reloadActiveTimeline().lastInstant().get()) + compactionService.start(JFunction.toJavaFunction(b => true)) + compactionService.waitTillPendingAsyncServiceInstantsReducesTo(0) + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): Unit = { + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/another-input-table-json", + expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated2-column-stats-index-table.json" + } else { + "index/colstats/mor-updated2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate) + } + + def verifyFileIndexAndSQLQueries(opts: Map[String, String], isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean = true): Unit = { + var commonOpts = opts + val inputDF1 = spark.read.format("hudi") + .options(commonOpts) + .option("as.of.instant", metaClient.getActiveTimeline.getInstants.get(1).getTimestamp) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "false") + .load(basePath) + inputDF1.createOrReplaceTempView("tbl") + val numRecordsWithC5ColumnGreaterThan70 = spark.sql("select * from tbl where c5 > 70").count() + // verify snapshot query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify read_optimized query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify incremental query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true") + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + if (verifyFileCount) { + // First commit creates 4 parquet files + // Second commit creates 4 parquet files + // Last commit creates one parquet file and 4 log files - total 5 files (for MOR) + // and 4 parquet files and no log file (for COW) + // therefore on deletion all those 5 files create a new log file whereas COW file count remains same + var numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 17 else if (hasLogFiles()) 12 else 8 + var dataFilter = GreaterThan(attribute("c5"), literal("70")) + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + + dataFilter = GreaterThan(attribute("c5"), literal("90")) + numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 11 else if (hasLogFiles()) 7 else 4 + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + } + } + + private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int): Unit = { + val fileIndex = HoodieFileIndex(spark, metaClient, None, opts + ("path" -> basePath)) + fileIndex.setIncludeLogFiles(isTableMOR()) + val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter)) + val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size + assertTrue(filteredFilesCount < getLatestDataFilesCount(opts)) + assertEquals(filteredFilesCount, numFiles) + } + + private def getLatestDataFilesCount(opts: Map[String, String], includeLogFiles: Boolean = true) = { + var totalLatestDataFiles = 0L + getTableFileSystenView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp) + .values() + .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]] + (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice]( + slice => totalLatestDataFiles += (if (includeLogFiles) slice.getLogFiles.count() else 0) + + (if (slice.getBaseFile.isPresent) 1 else 0))))) + totalLatestDataFiles + } + + private def getTableFileSystenView(opts: Map[String, String]): HoodieMetadataFileSystemView = { Review Comment: ```suggestion private def getTableFileSystemView(opts: Map[String, String]): HoodieMetadataFileSystemView = { ``` ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.async.SparkAsyncCompactService +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.utils.MetadataConversionUtils +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieCommitMetadata, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.{JFunction, JavaConversions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GreaterThan, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Properties +import scala.collection.JavaConverters + +class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() + ) ++ metadataOpts + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidate = false) + + assertEquals(4, getLatestDataFilesCount(commonOpts)) + assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) + var dataFilter = GreaterThan(attribute("c5"), literal("90")) + verifyPruningFileCount(commonOpts, dataFilter, 3) + dataFilter = GreaterThan(attribute("c5"), literal("95")) + verifyPruningFileCount(commonOpts, dataFilter, 1) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndexDeletionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true) + val lastDf = dfList.last + + lastDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) + + // Add the last df back and verify the queries + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = "", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate = false) + verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) + } + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + @Disabled("Needs more work") + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR")) + def testMetadataColumnStatsIndexAsyncCompactionWithSQL(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) ++ metadataOpts + setupTable(testCase, metadataOpts, commonOpts, shouldValidate = false) + + val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), getWriteConfig(commonOpts)) + writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty()) + val compactionService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jsc), writeClient) + compactionService.enqueuePendingAsyncServiceInstant(metaClient.reloadActiveTimeline().lastInstant().get()) + compactionService.start(JFunction.toJavaFunction(b => true)) + compactionService.waitTillPendingAsyncServiceInstantsReducesTo(0) + assertFalse(hasLogFiles()) + verifyFileIndexAndSQLQueries(commonOpts) + } + + private def setupTable(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean): Unit = { + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/another-input-table-json", + expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated2-column-stats-index-table.json" + } else { + "index/colstats/mor-updated2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidate) + } + + def verifyFileIndexAndSQLQueries(opts: Map[String, String], isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean = true): Unit = { + var commonOpts = opts + val inputDF1 = spark.read.format("hudi") + .options(commonOpts) + .option("as.of.instant", metaClient.getActiveTimeline.getInstants.get(1).getTimestamp) + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, "false") + .load(basePath) + inputDF1.createOrReplaceTempView("tbl") + val numRecordsWithC5ColumnGreaterThan70 = spark.sql("select * from tbl where c5 > 70").count() + // verify snapshot query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify read_optimized query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + // verify incremental query + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true") + verifySQLQueries(numRecordsWithC5ColumnGreaterThan70, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + + if (verifyFileCount) { + // First commit creates 4 parquet files + // Second commit creates 4 parquet files + // Last commit creates one parquet file and 4 log files - total 5 files (for MOR) + // and 4 parquet files and no log file (for COW) + // therefore on deletion all those 5 files create a new log file whereas COW file count remains same + var numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 17 else if (hasLogFiles()) 12 else 8 + var dataFilter = GreaterThan(attribute("c5"), literal("70")) + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + + dataFilter = GreaterThan(attribute("c5"), literal("90")) + numFiles = if (isTableMOR && isTableDataSameAsAfterSecondInstant) 11 else if (hasLogFiles()) 7 else 4 + verifyPruningFileCount(commonOpts, dataFilter, numFiles) + } + } + + private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, numFiles: Int): Unit = { + val fileIndex = HoodieFileIndex(spark, metaClient, None, opts + ("path" -> basePath)) + fileIndex.setIncludeLogFiles(isTableMOR()) + val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter)) + val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size + assertTrue(filteredFilesCount < getLatestDataFilesCount(opts)) + assertEquals(filteredFilesCount, numFiles) + } + + private def getLatestDataFilesCount(opts: Map[String, String], includeLogFiles: Boolean = true) = { + var totalLatestDataFiles = 0L + getTableFileSystenView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp) + .values() + .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]] + (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice]( + slice => totalLatestDataFiles += (if (includeLogFiles) slice.getLogFiles.count() else 0) + + (if (slice.getBaseFile.isPresent) 1 else 0))))) + totalLatestDataFiles + } + + private def getTableFileSystenView(opts: Map[String, String]): HoodieMetadataFileSystemView = { + new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, metadataWriter(getWriteConfig(opts)).getTableMetadata) + } + + protected def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = { + val props = new Properties() + props.putAll(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava) + HoodieWriteConfig.newBuilder() + .withProps(props) + .withPath(basePath) + .build() + } + + private def attribute(partition: String): AttributeReference = { + AttributeReference(partition, StringType, true)() + } + + private def literal(value: String): Literal = { + Literal.create(value) + } + + private def verifySQLQueries(numRecordsWithC5ColumnGreaterThan70AtPrevInstant: Long, queryType: String, opts: Map[String, String], isLastOperationDelete: Boolean): Unit = { + // 2 records are updated with c5 greater than 70 and one record is inserted with c5 value greater than 70 + var commonOpts: Map[String, String] = opts + createSQLTable(commonOpts, queryType) + val increment = if (queryType.equals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) && hasLogFiles()) { + 1 // only one insert + } else if (isLastOperationDelete) { + 0 // no increment + } else { + 3 // one insert and two upserts + } + assertEquals(spark.sql("select * from tbl where c5 > 70").count(), numRecordsWithC5ColumnGreaterThan70AtPrevInstant + increment) + val numRecordsWithC5ColumnGreaterThan50WithDataSkipping = spark.sql("select * from tbl where c5 > 70").count() + + if (queryType.equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)) { + createIncrementalSQLTable(commonOpts, metaClient.reloadActiveTimeline().getInstants.get(1).getTimestamp) + assertEquals(spark.sql("select * from tbl where c5 > 70").count(), if (isLastOperationDelete) 0 else 3) Review Comment: Can we also have one validation with two columns in the data filter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
