yihua commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1815839536
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -156,12 +167,61 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}).asJava,
indexSchema
)
+
+ if (metaClient.getTableConfig.getTableType ==
HoodieTableType.COPY_ON_WRITE) {
+ baseFilesDf // COW table
+ } else {
+ val allLogFiles = filegroupList.stream().flatMap(fileGroup =>
fileGroup.getAllFileSlices)
+ .flatMap(fileSlice => fileSlice.getLogFiles)
+ .collect(Collectors.toList[HoodieLogFile])
+ if (allLogFiles.isEmpty) {
+ baseFilesDf // MOR table, but no log files.
+ } else {
+ val colsToGenerateStats = indexedCols // check for included cols
+ val writerSchemaOpt =
LogFileColStatsTestUtil.getSchemaForTable(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ baseFilesDf.union(getColStatsForLogFiles(allLogFiles,
latestCompletedCommit,
+ scala.collection.JavaConverters.seqAsJavaList(colsToGenerateStats),
+ metaClient,
+ writerSchemaOpt: org.apache.hudi.common.util.Option[Schema],
+ HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue(),
+ indexSchema))
+ }
+ }
+ }
+
+ protected def getColStatsForLogFiles(logFiles: List[HoodieLogFile],
latestCommit: String, columnsToIndex: util.List[String],
Review Comment:
Use `For` or `From`, not both, to be consistent
```suggestion
protected def getColStatsForLogFiles(logFiles: List[HoodieLogFile],
latestCommit: String, columnsToIndex: util.List[String],
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -18,26 +18,37 @@
package org.apache.hudi.functional
+
+import org.apache.avro.Schema
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup,
HoodieLogFile, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.FileSystemViewManager
+import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
-
import org.apache.spark.sql._
+import
org.apache.hudi.functional.ColumnStatIndexTestBase.DoWriteAndValidateColumnStatsParams
+import org.apache.hudi.testutils.{HoodieSparkClientTestBase,
LogFileColStatsTestUtil}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, _}
Review Comment:
`import org.apache.spark.sql._` should be enough?
##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
+
+/**
+ * Util methods used in tests to fetch col stats records for a log file.
+ */
+public class LogFileColStatsTestUtil {
+
+ public static Option<Row> getLogFileColumnRangeMetadata(String filePath,
HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
+ List<String> columnsToIndex,
Option<Schema> writerSchemaOpt,
+ int maxBufferSize) throws
IOException {
+ if (writerSchemaOpt.isPresent()) {
+ List<Schema.Field> fieldsToIndex =
writerSchemaOpt.get().getFields().stream()
+ .filter(field -> columnsToIndex.contains(field.name()))
+ .collect(Collectors.toList());
+ List<HoodieRecord> records = new ArrayList<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(maxBufferSize)
+ .withLatestInstantTime(latestCommitTime)
+ .withReaderSchema(writerSchemaOpt.get())
+ .withLogRecordScannerCallback(records::add)
+ .build();
+ scanner.scan();
+ if (records.isEmpty()) {
+ return Option.empty();
+ }
+ Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
+ collectColumnRangeMetadata(records, fieldsToIndex, filePath,
writerSchemaOpt.get());
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>(columnRangeMetadataMap.values());
+ return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
Review Comment:
Could we have sth simple instead of reusing `collectColumnRangeMetadata`
because it is also used by production code?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala:
##########
@@ -133,12 +133,12 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
verifyFileIndexAndSQLQueries(commonOpts,
isTableDataSameAsAfterSecondInstant = true)
// Add the last df back and verify the queries
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
Review Comment:
How is `doWriteAndValidateColumnStats` different from
`DoWriteAndValidateColumnStatsParams`?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -90,13 +97,428 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ simulateFailureForLatestCommit(tableType, partitionCol)
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
Review Comment:
Not sure if the `/` is necessary?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -90,13 +97,428 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
Review Comment:
For MOR, could we also assert there are log files in the table?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -90,13 +97,428 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ simulateFailureForLatestCommit(tableType, partitionCol)
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
Review Comment:
```suggestion
metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath, "/"))
```
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2434,13 +2444,21 @@ public static class DirectoryInfo implements
Serializable {
private boolean isHoodiePartition = false;
public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants) {
+ this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
+ }
+
+ /*
+ When files are directly fetched from Metadata table we do not need to
validate HoodiePartitions.
+ */
+ public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants,
+ boolean validateHoodiePartitions) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(pathInfos.size());
// Presence of partition meta file implies this is a HUDI partition
- isHoodiePartition = pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+ isHoodiePartition = !validateHoodiePartitions ||
pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
Review Comment:
Trying to understand why we need this?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -90,13 +97,428 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ simulateFailureForLatestCommit(tableType, partitionCol)
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/9/"))
Review Comment:
Same for all storage path construction.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -269,14 +330,42 @@ object ColumnStatIndexTestBase {
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
{
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType
=>
Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false)))
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false))
+ )
): _*)
}
def testMetadataColumnStatsIndexParamsForMOR:
java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false)))
- : _*)
+ Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false))
+ )
+ : _*)
}
+
+ def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ Seq(
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"),
+ // empty partition col represents non-partitioned table.
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "")
+ )
+ : _*)
+ }
+
+ case class DoWriteAndValidateColumnStatsParams(testCase: ColumnStatsTestCase,
+ metadataOpts: Map[String,
String],
+ hudiOpts: Map[String, String],
+ dataSourcePath: String,
+ expectedColStatsSourcePath:
String,
+ operation: String,
+ saveMode: SaveMode,
+ shouldValidate: Boolean =
true,
+ latestCompletedCommit: String
= null,
+ numPartitions: Integer = 4,
+ parquetMaxFileSize: Integer =
10 * 1024,
+ smallFileLimit: Integer = 100
* 1024 * 1024)
}
+
Review Comment:
Remove redundant empty line.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -90,13 +97,428 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ simulateFailureForLatestCommit(tableType, partitionCol)
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/9/"))
+ }
+ val logFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(".log")).findFirst().get()
+ logFileName = logFileFileStatus.getPath.getName
+ } else {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString + "/9/"))
+ }
+ val baseFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(lastCompletedCommit.getTimestamp)).findFirst().get()
+ baseFileName = baseFileFileStatus.getPath.getName
+ }
+
+ val latestCompletedFileName = lastCompletedCommit.getFileName
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" +
latestCompletedFileName))
+
+ // re-create marker for the deleted file.
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + logFileName + ".marker.APPEND"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + logFileName + ".marker.APPEND"))
+ }
+ } else {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + baseFileName + ".marker.MERGE"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + baseFileName + ".marker.MERGE"))
+ }
+ }
+
Review Comment:
Remove redundant line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]