vinothchandar commented on code in PR #10352: URL: https://github.com/apache/hudi/pull/10352#discussion_r1444903624
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, HoodieMetadataRecord} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.hash.ColumnIndexID +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil} +import org.apache.hudi.util.JFunction +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +class PartitionStatsIndexSupport(spark: SparkSession, + tableSchema: StructType, + @transient metadataConfig: HoodieMetadataConfig, + @transient metaClient: HoodieTableMetaClient, + allowCaching: Boolean = false) + extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, metaClient, allowCaching) { + + @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + @transient private lazy val metadataTable: HoodieTableMetadata = + HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString) + + override def isIndexAvailable: Boolean = { + checkState(metadataConfig.enabled, "Metadata Table support has to be enabled") + metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS) + } + + override def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { + checkState(targetColumns.nonEmpty) + val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString()) + val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] = + metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory) + val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] = + // NOTE: Explicit conversion is required for Scala 2.11 Review Comment: side note: I think we can drop support for 2.11? ########## hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java: ########## @@ -454,6 +454,51 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta } } + /** + * Aggregate column range statistics across files in a partition. + * + * @param fileRanges List of column range statistics for each file in a partition + */ + public <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> fileRanges) { Review Comment: this is leaking upper level context (files and ranges) into ParquetUtils? This class ought to be about just reading various things out of parquet files. the actual columnar file format. Please relocate and add unit tests/ ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.PartitionStatsIndexSupport +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{ActionType, HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JavaConversions +import org.apache.spark.sql.functions.{col, not} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} + +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.{JavaConverters, mutable} + +/** + * Common test setup and validation methods for partition stats index testing. + */ +class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase { + + var spark: SparkSession = _ + var instantTime: AtomicInteger = _ + val metadataOpts: Map[String, String] = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + ) + val commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "_row_key", + PARTITIONPATH_FIELD.key -> "partition", Review Comment: lets change this test to have multiple partition fields, so we have a multi-level /x/y/z like partition structure ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.PartitionStatsIndexSupport +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{ActionType, HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JavaConversions +import org.apache.spark.sql.functions.{col, not} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} + +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.{JavaConverters, mutable} + +/** + * Common test setup and validation methods for partition stats index testing. + */ +class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase { + + var spark: SparkSession = _ + var instantTime: AtomicInteger = _ + val metadataOpts: Map[String, String] = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + ) + val commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "_row_key", + PARTITIONPATH_FIELD.key -> "partition", + PRECOMBINE_FIELD.key -> "timestamp", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + var mergedDfList: List[DataFrame] = List.empty + + @BeforeEach + override def setUp(): Unit = { + initPath() + initSparkContexts() + initFileSystem() + initTestDataGenerator() + + setTableName("hoodie_test") + initMetaClient() + + instantTime = new AtomicInteger(1) + + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown(): Unit = { + cleanupResources() + } + + protected def getLatestMetaClient(enforce: Boolean): HoodieTableMetaClient = { + val lastInsant = String.format("%03d", new Integer(instantTime.incrementAndGet())) + if (enforce || metaClient.getActiveTimeline.lastInstant().get().getTimestamp.compareTo(lastInsant) < 0) { + println("Reloaded timeline") + metaClient.reloadActiveTimeline() + metaClient + } + metaClient + } + + protected def rollbackLastInstant(hudiOpts: Map[String, String]): HoodieInstant = { Review Comment: is nt there some test util class we can reuse for rolling back the last instant? general comment is on - can we cut down the amount of additional test code and reuse? or we need specific things here? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1901,4 +1909,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + Path.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.info(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); Review Comment: info? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD +import org.apache.hudi.common.model.{FileSlice, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.metadata.HoodieMetadataFileSystemView +import org.apache.hudi.util.JFunction +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +/** + * Test cases on partition stats index with Spark datasource and Spark sql. + */ +@Tag("functional") +class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { + + val sqlTempTable = "hudi_tbl" + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testIndexInitialization(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testIndexWithUpsert(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + doWriteAndValidateDataAndPartitionStats(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testIndexWithUpsertNonPartitioned(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testIndexUpsertAndRollback(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + rollbackLastInstant(hudiOpts) + validateDataAndPartitionStats() + } + + @Test + def testPartitionStatsIndexWithSQL(): Unit = { Review Comment: lets cover CTAS, followed by insert, update, merge, delete statements. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.PartitionStatsIndexSupport +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{ActionType, HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JavaConversions +import org.apache.spark.sql.functions.{col, not} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} + +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.{JavaConverters, mutable} + +/** + * Common test setup and validation methods for partition stats index testing. + */ +class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase { + + var spark: SparkSession = _ + var instantTime: AtomicInteger = _ + val metadataOpts: Map[String, String] = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + ) + val commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "_row_key", + PARTITIONPATH_FIELD.key -> "partition", + PRECOMBINE_FIELD.key -> "timestamp", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + var mergedDfList: List[DataFrame] = List.empty + + @BeforeEach + override def setUp(): Unit = { + initPath() + initSparkContexts() + initFileSystem() + initTestDataGenerator() + + setTableName("hoodie_test") + initMetaClient() + + instantTime = new AtomicInteger(1) + + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown(): Unit = { + cleanupResources() + } + + protected def getLatestMetaClient(enforce: Boolean): HoodieTableMetaClient = { + val lastInsant = String.format("%03d", new Integer(instantTime.incrementAndGet())) + if (enforce || metaClient.getActiveTimeline.lastInstant().get().getTimestamp.compareTo(lastInsant) < 0) { + println("Reloaded timeline") + metaClient.reloadActiveTimeline() + metaClient + } + metaClient + } + + protected def rollbackLastInstant(hudiOpts: Map[String, String]): HoodieInstant = { + val lastInstant = getLatestMetaClient(false).getActiveTimeline + .filter(JavaConversions.getPredicate(instant => instant.getAction != ActionType.rollback.name())) + .lastInstant().get() + if (getLatestCompactionInstant != getLatestMetaClient(false).getActiveTimeline.lastInstant() + && lastInstant.getAction != ActionType.replacecommit.name() + && lastInstant.getAction != ActionType.clean.name()) { + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + val writeConfig = getWriteConfig(hudiOpts) + new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig) + .rollback(lastInstant.getTimestamp) + + if (lastInstant.getAction != ActionType.clean.name()) { + assertEquals(ActionType.rollback.name(), getLatestMetaClient(true).getActiveTimeline.lastInstant().get().getAction) + } + lastInstant + } + + protected def executeFunctionNTimes[T](function0: Function0[T], n: Int): Unit = { + for (_ <- 1 to n) { + function0.apply() + } + } + + protected def deleteLastCompletedCommitFromDataAndMetadataTimeline(hudiOpts: Map[String, String]): Unit = { + val writeConfig = getWriteConfig(hudiOpts) + val lastInstant = getHoodieTable(metaClient, writeConfig).getCompletedCommitsTimeline.lastInstant().get() + val metadataTableMetaClient = getHoodieTable(metaClient, writeConfig).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient + val metadataTableLastInstant = metadataTableMetaClient.getCommitsTimeline.lastInstant().get() + assertTrue(fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName), false)) + assertTrue(fs.delete(new Path(metadataTableMetaClient.getMetaPath, metadataTableLastInstant.getFileName), false)) + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + + protected def deleteLastCompletedCommitFromTimeline(hudiOpts: Map[String, String]): Unit = { + val writeConfig = getWriteConfig(hudiOpts) + val lastInstant = getHoodieTable(metaClient, writeConfig).getCompletedCommitsTimeline.lastInstant().get() + assertTrue(fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName), false)) + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + + protected def getMetadataMetaClient(hudiOpts: Map[String, String]): HoodieTableMetaClient = { + getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient + } + + protected def getLatestCompactionInstant: org.apache.hudi.common.util.Option[HoodieInstant] = { + getLatestMetaClient(false).getActiveTimeline + .filter(JavaConversions.getPredicate(s => Option( + try { + val commitMetadata = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, s) + .orElse(new HoodieCommitMetadata()) + commitMetadata + } catch { + case _: Exception => new HoodieCommitMetadata() + }) + .map(c => c.getOperationType == WriteOperationType.COMPACT) + .get)) + .lastInstant() + } + + protected def getLatestClusteringInstant: org.apache.hudi.common.util.Option[HoodieInstant] = { + getLatestMetaClient(false).getActiveTimeline.getCompletedReplaceTimeline.lastInstant() + } + + protected def doWriteAndValidateDataAndPartitionStats(hudiOpts: Map[String, String], + operation: String, + saveMode: SaveMode, + validate: Boolean = true): DataFrame = { + var latestBatch: mutable.Buffer[String] = null + if (operation == UPSERT_OPERATION_OPT_VAL) { + val instantTime = getInstantTime + val records = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 5)) + records.addAll(recordsToStrings(dataGen.generateInserts(instantTime, 5))) + latestBatch = records.asScala + } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) { + latestBatch = recordsToStrings(dataGen.generateInsertsForPartition( + getInstantTime, 5, dataGen.getPartitionPaths.last)).asScala + } else { + latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime, 5)).asScala + } + val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 2)) + latestBatchDf.cache() + latestBatchDf.write.format("org.apache.hudi") + .options(hudiOpts) + .option(OPERATION.key, operation) + .mode(saveMode) + .save(basePath) + val deletedDf = calculateMergedDf(latestBatchDf, operation) + deletedDf.cache() + if (validate) { + validateDataAndPartitionStats(deletedDf) + } + deletedDf.unpersist() + latestBatchDf + } + + /** + * @return [[DataFrame]] that should not exist as of the latest instant; used for non-existence validation. + */ + protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String): DataFrame = { + val prevDfOpt = mergedDfList.lastOption + if (prevDfOpt.isEmpty) { + mergedDfList = mergedDfList :+ latestBatchDf + sparkSession.emptyDataFrame + } else { + if (operation == INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) { + mergedDfList = mergedDfList :+ latestBatchDf + // after insert_overwrite_table, all previous snapshot's records should be deleted from RLI + prevDfOpt.get + } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) { + val overwrittenPartitions = latestBatchDf.select("partition") + .collectAsList().stream().map[String](JavaConversions.getFunction[Row, String](r => r.getString(0))).collect(Collectors.toList[String]) + val prevDf = prevDfOpt.get + val latestSnapshot = prevDf + .filter(not(col("partition").isInCollection(overwrittenPartitions))) + .union(latestBatchDf) + mergedDfList = mergedDfList :+ latestSnapshot + + // after insert_overwrite (partition), all records in the overwritten partitions should be deleted from RLI + prevDf.filter(col("partition").isInCollection(overwrittenPartitions)) + } else { + val prevDf = prevDfOpt.get + val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === latestBatchDf("_row_key") + && prevDf("partition") === latestBatchDf("partition"), "leftanti") + val latestSnapshot = prevDfOld.union(latestBatchDf) + mergedDfList = mergedDfList :+ latestSnapshot + sparkSession.emptyDataFrame + } + } + } + + private def getInstantTime: String = { + String.format("%03d", new Integer(instantTime.incrementAndGet())) + } + + protected def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = { + val props = TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava) + HoodieWriteConfig.newBuilder() + .withProps(props) + .withPath(basePath) + .build() + } + + protected def validateDataAndPartitionStats(inputDf: DataFrame = sparkSession.emptyDataFrame): Unit = { + metaClient = HoodieTableMetaClient.reload(metaClient) + val readDf = spark.read.format("hudi").load(basePath) + val partitionStatsIndex = new PartitionStatsIndexSupport( + spark, + inputDf.schema, + HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(), + metaClient) + val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(List("partition"), shouldReadInMemory = true).collectAsList() + assertEquals(0, partitionStats.size()) + val prevDf = mergedDfList.last.drop("tip_history") + val nonMatchingRecords = readDf.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", Review Comment: is nt there a method for this to drop the meta fields from a DF? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.PartitionStatsIndexSupport +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.{ActionType, HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.timeline.{HoodieInstant, MetadataConversionUtils} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JavaConversions +import org.apache.spark.sql.functions.{col, not} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} + +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.{JavaConverters, mutable} + +/** + * Common test setup and validation methods for partition stats index testing. + */ +class PartitionStatsIndexTestBase extends HoodieSparkClientTestBase { + + var spark: SparkSession = _ + var instantTime: AtomicInteger = _ + val metadataOpts: Map[String, String] = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + ) + val commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "_row_key", + PARTITIONPATH_FIELD.key -> "partition", + PRECOMBINE_FIELD.key -> "timestamp", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + var mergedDfList: List[DataFrame] = List.empty + + @BeforeEach + override def setUp(): Unit = { + initPath() + initSparkContexts() + initFileSystem() + initTestDataGenerator() + + setTableName("hoodie_test") + initMetaClient() + + instantTime = new AtomicInteger(1) + + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown(): Unit = { + cleanupResources() + } + + protected def getLatestMetaClient(enforce: Boolean): HoodieTableMetaClient = { + val lastInsant = String.format("%03d", new Integer(instantTime.incrementAndGet())) + if (enforce || metaClient.getActiveTimeline.lastInstant().get().getTimestamp.compareTo(lastInsant) < 0) { + println("Reloaded timeline") + metaClient.reloadActiveTimeline() + metaClient + } + metaClient + } + + protected def rollbackLastInstant(hudiOpts: Map[String, String]): HoodieInstant = { + val lastInstant = getLatestMetaClient(false).getActiveTimeline + .filter(JavaConversions.getPredicate(instant => instant.getAction != ActionType.rollback.name())) + .lastInstant().get() + if (getLatestCompactionInstant != getLatestMetaClient(false).getActiveTimeline.lastInstant() + && lastInstant.getAction != ActionType.replacecommit.name() + && lastInstant.getAction != ActionType.clean.name()) { + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + val writeConfig = getWriteConfig(hudiOpts) + new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig) + .rollback(lastInstant.getTimestamp) + + if (lastInstant.getAction != ActionType.clean.name()) { + assertEquals(ActionType.rollback.name(), getLatestMetaClient(true).getActiveTimeline.lastInstant().get().getAction) + } + lastInstant + } + + protected def executeFunctionNTimes[T](function0: Function0[T], n: Int): Unit = { + for (_ <- 1 to n) { + function0.apply() + } + } + + protected def deleteLastCompletedCommitFromDataAndMetadataTimeline(hudiOpts: Map[String, String]): Unit = { + val writeConfig = getWriteConfig(hudiOpts) + val lastInstant = getHoodieTable(metaClient, writeConfig).getCompletedCommitsTimeline.lastInstant().get() + val metadataTableMetaClient = getHoodieTable(metaClient, writeConfig).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient + val metadataTableLastInstant = metadataTableMetaClient.getCommitsTimeline.lastInstant().get() + assertTrue(fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName), false)) + assertTrue(fs.delete(new Path(metadataTableMetaClient.getMetaPath, metadataTableLastInstant.getFileName), false)) + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + + protected def deleteLastCompletedCommitFromTimeline(hudiOpts: Map[String, String]): Unit = { + val writeConfig = getWriteConfig(hudiOpts) + val lastInstant = getHoodieTable(metaClient, writeConfig).getCompletedCommitsTimeline.lastInstant().get() + assertTrue(fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName), false)) + mergedDfList = mergedDfList.take(mergedDfList.size - 1) + } + + protected def getMetadataMetaClient(hudiOpts: Map[String, String]): HoodieTableMetaClient = { + getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient + } + + protected def getLatestCompactionInstant: org.apache.hudi.common.util.Option[HoodieInstant] = { + getLatestMetaClient(false).getActiveTimeline + .filter(JavaConversions.getPredicate(s => Option( + try { + val commitMetadata = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, s) + .orElse(new HoodieCommitMetadata()) + commitMetadata + } catch { + case _: Exception => new HoodieCommitMetadata() + }) + .map(c => c.getOperationType == WriteOperationType.COMPACT) + .get)) + .lastInstant() + } + + protected def getLatestClusteringInstant: org.apache.hudi.common.util.Option[HoodieInstant] = { + getLatestMetaClient(false).getActiveTimeline.getCompletedReplaceTimeline.lastInstant() + } + + protected def doWriteAndValidateDataAndPartitionStats(hudiOpts: Map[String, String], + operation: String, + saveMode: SaveMode, + validate: Boolean = true): DataFrame = { + var latestBatch: mutable.Buffer[String] = null + if (operation == UPSERT_OPERATION_OPT_VAL) { + val instantTime = getInstantTime + val records = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 5)) + records.addAll(recordsToStrings(dataGen.generateInserts(instantTime, 5))) + latestBatch = records.asScala + } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) { + latestBatch = recordsToStrings(dataGen.generateInsertsForPartition( + getInstantTime, 5, dataGen.getPartitionPaths.last)).asScala + } else { + latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime, 5)).asScala + } + val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 2)) + latestBatchDf.cache() + latestBatchDf.write.format("org.apache.hudi") Review Comment: does `.partitionBy(..)` work? Same question for a CTAS statement with partition columns. ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -26,6 +26,7 @@ */ public enum MetadataPartitionType { FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-"), + PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, "part-stats-"), Review Comment: rename: partition-stats -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
