yihua commented on code in PR #6319:
URL: https://github.com/apache/hudi/pull/6319#discussion_r939460249
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -457,23 +462,24 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
private def sort(df: DataFrame): DataFrame = {
val sortedCols = df.columns.sorted
- // Sort dataset by the first 2 columns (to minimize non-determinism in
case multiple files have the same
+ // Sort dataset by 4 columns (to minimize non-determinism in case multiple
files have the same
// value of the first column)
df.select(sortedCols.head, sortedCols.tail: _*)
- .sort("c1_maxValue", "c1_minValue")
+ .sort("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue")
Review Comment:
The log file has the same record key range (min and max of "c1" column) as
the parquet base file so we need to add two more columns for deterministic
ordering.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -365,6 +304,36 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
})
}
+ private def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
Review Comment:
This method is extracted from the existing logic of reading the Json source
as the input.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -411,6 +380,42 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
)
}
+ private def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
Review Comment:
This method is extracted from the existing logic of validating column stats
index: (1) comparing the actual column stats index content with provided Json
file, (2) for parquet files, construct the column stats index from the parquet
file and compare that with column stats index from metadata table.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -97,88 +99,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
- val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
-
- // NOTE: Schema here is provided for validation that the input date is in
the appropriate format
- val inputDF =
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
-
- inputDF
- .sort("c1")
- .repartition(4, new Column("c1"))
- .write
- .format("hudi")
- .options(opts)
- .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- metaClient = HoodieTableMetaClient.reload(metaClient)
-
- val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(toProperties(metadataOpts))
- .build()
-
- val requestedColumns: Seq[String] = sourceTableSchema.fieldNames
-
- val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
-
- val expectedColStatsSchema =
composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
-
- columnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedColStatsDF =>
- // Match against expected column stats table
- val expectedColStatsIndexTableDf =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
-
- assertEquals(expectedColStatsIndexTableDf.schema,
transposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains
semi-random components
- // that we can't control in this test. Nevertheless, since we
manually verify composition of the
- // ColStats Index by reading Parquet footers from individual
Parquet files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(transposedColStatsDF.drop("fileName"))))
-
- // Collect Column Stats manually (reading individual Parquet files)
- val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
sourceTableSchema.fieldNames, expectedColStatsSchema)
-
- assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(transposedColStatsDF)))
- }
-
- // do an upsert and validate
- val updateJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
- val updateDF = spark.read
- .schema(sourceTableSchema)
- .json(updateJSONTablePath)
-
- updateDF.repartition(4)
- .write
- .format("hudi")
- .options(opts)
- .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
-
- metaClient = HoodieTableMetaClient.reload(metaClient)
-
- val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
-
- updatedColumnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedUpdatedColStatsDF =>
- val expectedColStatsIndexUpdatedDF =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
-
- assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ doWriteAndValidateColumnStats(testCase, metadataOpts, hudiOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
SaveMode.Overwrite)
- // Collect Column Stats manually (reading individual Parquet files)
- val manualUpdatedColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
sourceTableSchema.fieldNames, expectedColStatsSchema)
+ doWriteAndValidateColumnStats(testCase, metadataOpts, hudiOpts,
+ dataSourcePath = "index/colstats/another-input-table-json",
+ expectedColStatsSourcePath =
"index/colstats/updated-column-stats-index-table.json",
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
SaveMode.Append)
- assertEquals(asJson(sort(manualUpdatedColStatsTableDF)),
asJson(sort(transposedUpdatedColStatsDF)))
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-updated2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-updated2-column-stats-index-table.json"
}
+ doWriteAndValidateColumnStats(testCase, metadataOpts, hudiOpts,
+ dataSourcePath = "index/colstats/update-input-table-json",
+ expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
SaveMode.Append)
Review Comment:
Adding a new upsert operation here to increase coverage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]