codope commented on code in PR #12671:
URL: https://github.com/apache/hudi/pull/12671#discussion_r1926440826
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java:
##########
@@ -81,15 +81,17 @@ void testRecreateMDTForInsertOverwriteTableOperation() {
.setBasePath(mdtBasePath).build();
HoodieActiveTimeline timeline = mdtMetaClient.getActiveTimeline();
List<HoodieInstant> instants = timeline.getInstants();
- assertEquals(4, instants.size());
+ assertEquals(5, instants.size());
Review Comment:
i'm thinking if we should make it more dynamic based on
MetadataPartitionType.getEnabledPartitions. We always hit this when we enable
any index by default. Something to followup.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -212,6 +212,9 @@ private HoodieMetadataFileSystemView getMetadataView() {
if (metadataView == null ||
!metadataView.equals(metadata.getMetadataFileSystemView())) {
ValidationUtils.checkState(metadata != null, "Metadata table not
initialized");
ValidationUtils.checkState(dataMetaClient != null, "Data table meta
client not initialized");
+ if (metadataView != null) {
+ metadataView.close();
+ }
Review Comment:
Why are we closing it here? Should it be moved to close() method?
##########
hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table.json:
##########
@@ -1,4 +1,4 @@
{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
769sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.543-08:00","c4_minValue":"2021-11-19T20:40:55.521-08:00","c4_nullCount":0,"c5_maxValue":78,"c5_minValue":32,"c5_nullCount":0,"c6_maxValue":"2020-11-14","c6_minValue":"2020-01-08","c6_nullCount":0,"c7_maxValue":"uQ==","c7_minValue":"AQ==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":9}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
932sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":94,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
943sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_nullCount":0,"c5_maxValue":95,"c5_minValue":10,"c5_nullCount":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_nullCount":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10}
-{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
\ No newline at end of file
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_nullCount":0,"c7_maxValue":"vw==","c7_minValue":"1A==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
Review Comment:
Can we create followups for binary types as we discussed?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -376,12 +486,13 @@ object ColumnStatIndexTestBase {
expectedColStatsSourcePath: String,
operation: String,
saveMode: SaveMode,
- shouldValidate: Boolean = true,
+ shouldValidateColStats: Boolean = true,
shouldValidateManually: Boolean = true,
latestCompletedCommit: String = null,
numPartitions: Integer = 4,
parquetMaxFileSize: Integer = 10 * 1024,
smallFileLimit: Integer = 100 * 1024 * 1024,
+ shouldValidatePartitionSats : Boolean =
false,
Review Comment:
```suggestion
shouldValidatePartitionStats : Boolean =
false,
```
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1524,27 +1517,44 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper, Stri
if (avroValueWrapper == null) {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
- return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record)
avroValueWrapper).get(0)));
+ if (avroValueWrapper instanceof GenericRecord) {
Review Comment:
let's add a comment as to when the wrapper can be GenericRecord or Record.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2432,7 +2432,7 @@ public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(Hoodi
}
Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
final List<String> columnsToIndex =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
Either.right(lazyWriterSchemaOpt),
-
dataTableMetaClient.getActiveTimeline().filterCompletedInstants().empty(),
recordTypeOpt);
+
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(),
recordTypeOpt);
Review Comment:
any completed action should be sufficient to tell whether table is
initialized or not, right.. why `getWriteTimeline` specifically? Do we just
want to prune the number of instants for empty() check, or was there some bug?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -280,6 +327,62 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}
}
+ protected def validatePartitionStatsIndex(testCase: ColumnStatsTestCase,
+ metadataOpts: Map[String, String],
+ expectedColStatsSourcePath: String,
+ validatePartitionStatsManually:
Boolean,
+ latestCompletedCommit: String):
Unit = {
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ val pStatsIndex = new PartitionStatsIndexSupport(spark, sourceTableSchema,
metadataConfig, metaClient)
+
+ val pIndexedColumns: Seq[String] = HoodieTableMetadataUtil
+ .getColumnsToIndex(metaClient.getTableConfig, metadataConfig,
convertScalaListToJavaList(sourceTableSchema.fieldNames))
+ .asScala.filter(colName => !colName.startsWith("_hoodie")).toSeq.sorted
+
+ val (pExpectedColStatsSchema, _) = composeIndexSchema(pIndexedColumns,
pIndexedColumns, sourceTableSchema)
+ val pValidationSortColumns = if (pIndexedColumns.contains("c5")) {
+ Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue",
"c3_maxValue",
+ "c3_minValue", "c5_maxValue", "c5_minValue")
+ } else {
+ Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue",
"c3_maxValue", "c3_minValue")
+ }
+
+ pStatsIndex.loadTransposed(sourceTableSchema.fieldNames,
testCase.shouldReadInMemory) { pTransposedColStatsDF =>
+ // Match against expected column stats table
+ val pExpectedColStatsIndexTableDf = {
+ spark.read
+ .schema(pExpectedColStatsSchema)
+
.json(getClass.getClassLoader.getResource(expectedColStatsSourcePath).toString)
+ }
+
+ val colsToDrop = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ Seq("fileName")
+ } else {
+ Seq("fileName","valueCount")
+ }
+
+ //assertEquals(expectedColStatsIndexTableDf.schema,
transposedColStatsDF.schema)
Review Comment:
why not validate schema? not following the note here.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -70,7 +69,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
- .add("c7", BinaryType)
+ .add("c7", StringType)
Review Comment:
We already have `c2` as StringType, why not jsut comment `c7` with a note
for now?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -280,6 +327,62 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}
}
+ protected def validatePartitionStatsIndex(testCase: ColumnStatsTestCase,
Review Comment:
So, this method just validates the index contents. Are there validation on
partition pruning based on stats? I believe there might already be such a case
before this patch. But, can you please confirm?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -280,6 +327,62 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}
}
+ protected def validatePartitionStatsIndex(testCase: ColumnStatsTestCase,
+ metadataOpts: Map[String, String],
+ expectedColStatsSourcePath: String,
+ validatePartitionStatsManually:
Boolean,
Review Comment:
Where this boolean is getting used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]