alexeykudinkin commented on code in PR #6319:
URL: https://github.com/apache/hudi/pull/6319#discussion_r940727753
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -365,6 +304,36 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
})
}
+ private def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
+ metadataOpts: Map[String, String],
+ hudiOpts: Map[String, String],
+ dataSourcePath: String,
+ expectedColStatsSourcePath: String,
+ operation: String,
+ saveMode: SaveMode): Unit = {
+ val sourceJSONTablePath =
getClass.getClassLoader.getResource(dataSourcePath).toString
+
+ // NOTE: Schema here is provided for validation that the input date is in
the appropriate format
+ val inputDF =
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+
+ inputDF
+ .sort("c1")
+ .repartition(4, new Column("c1"))
+ .write
+ .format("hudi")
+ .options(hudiOpts)
+ .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(saveMode)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
+ validateColumnStatsIndex(testCase, metadataOpts,
expectedColStatsSourcePath,
+ testCase.tableType == HoodieTableType.COPY_ON_WRITE
Review Comment:
Let's extract this as a boolean, and add a comment explaining why we're not
validating in some cases
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -84,10 +85,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
- val opts = Map(
+ val hudiOpts = Map(
Review Comment:
All of the opts are Hudi opts, maybe `commonOpts`?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -97,88 +99,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
- val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
-
- // NOTE: Schema here is provided for validation that the input date is in
the appropriate format
- val inputDF =
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
-
- inputDF
- .sort("c1")
- .repartition(4, new Column("c1"))
- .write
- .format("hudi")
- .options(opts)
- .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- metaClient = HoodieTableMetaClient.reload(metaClient)
-
- val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(toProperties(metadataOpts))
- .build()
-
- val requestedColumns: Seq[String] = sourceTableSchema.fieldNames
-
- val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
-
- val expectedColStatsSchema =
composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
-
- columnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedColStatsDF =>
- // Match against expected column stats table
- val expectedColStatsIndexTableDf =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
-
- assertEquals(expectedColStatsIndexTableDf.schema,
transposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains
semi-random components
- // that we can't control in this test. Nevertheless, since we
manually verify composition of the
- // ColStats Index by reading Parquet footers from individual
Parquet files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)),
asJson(sort(transposedColStatsDF.drop("fileName"))))
-
- // Collect Column Stats manually (reading individual Parquet files)
- val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames,
sourceTableSchema.fieldNames, expectedColStatsSchema)
-
- assertEquals(asJson(sort(manualColStatsTableDF)),
asJson(sort(transposedColStatsDF)))
- }
-
- // do an upsert and validate
- val updateJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
- val updateDF = spark.read
- .schema(sourceTableSchema)
- .json(updateJSONTablePath)
-
- updateDF.repartition(4)
- .write
- .format("hudi")
- .options(opts)
- .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
-
- metaClient = HoodieTableMetaClient.reload(metaClient)
-
- val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
-
- updatedColumnStatsIndex.loadTransposed(requestedColumns,
testCase.shouldReadInMemory) { transposedUpdatedColStatsDF =>
- val expectedColStatsIndexUpdatedDF =
- spark.read
- .schema(expectedColStatsSchema)
-
.json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
-
- assertEquals(expectedColStatsIndexUpdatedDF.schema,
transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)),
asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ doWriteAndValidateColumnStats(testCase, metadataOpts, hudiOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
SaveMode.Overwrite)
Review Comment:
nit: Making the last param non-explicit and also following another explicit
one makes it hardly readable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]