yihua commented on code in PR #13711:
URL: https://github.com/apache/hudi/pull/13711#discussion_r2353900069
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -3223,7 +3224,7 @@ public void testColStatsMultipleColumns() throws
Exception {
Map<String, HoodieColumnRangeMetadata<Comparable>> parquetStatsMap =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
- .readColumnStatsFromMetadata(metaClient.getStorage(),
fullFilePath, columns)
+ .readColumnStatsFromMetadata(metaClient.getStorage(),
fullFilePath, columns, V1)
Review Comment:
Do you think we can parameterize this test on write table versions? Also
supposedly this test should use V2 column stats index by default?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala:
##########
@@ -72,10 +72,6 @@ abstract class
SparkParquetReaderBase(enableVectorizedReader: Boolean,
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, true)
// Using string value of this conf to preserve compatibility across spark
versions.
conf.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, false)
- if (HoodieSparkUtils.gteqSpark3_4) {
- // PARQUET_INFER_TIMESTAMP_NTZ_ENABLED is required from Spark 3.4.0 or
above
- conf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false)
- }
Review Comment:
Is this still needed?
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -598,6 +669,106 @@ public GenericRecord
generateRecordForTripEncodedDecimalSchema(String rowKey, St
return rec;
}
+ public GenericRecord generateRecordForTripLogicalTypesSchema(HoodieKey key,
String riderName, String driverName,
+ long timestamp,
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
+ GenericRecord rec;
+ if (!hasLTS) {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+ } else if (v6) {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
+ } else {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA);
+ }
+ generateTripPrefixValues(rec, key.getRecordKey(), key.getPartitionPath(),
riderName, driverName, timestamp);
+
+ int hash = key.getRecordKey().hashCode();
+ boolean above = (hash & 1) == 0; // half above, half below threshold
+
+ // -------------------
+ // Threshold definitions
+ // -------------------
+ Instant tsMillisThreshold = Instant.parse("2020-01-01T00:00:00Z");
+ Instant tsMicrosThreshold = Instant.parse("2020-06-01T12:00:00Z");
+
+ //LocalTime timeMillisThreshold = LocalTime.of(12, 0, 0); // noon
+ //LocalTime timeMicrosThreshold = LocalTime.of(6, 0, 0); // 6 AM
Review Comment:
Is this still needed
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -192,9 +195,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
// schema that we want fg reader to output to us
+ val exclusionFields = new java.util.HashSet[String]()
+ exclusionFields.add("op")
+ partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
val requestedSchema = StructType(requiredSchema.fields ++
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
- val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName)
- val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
+ val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema,
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName), exclusionFields)
+ val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema,
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName), exclusionFields)
Review Comment:
What does the "op" field correspond to?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java:
##########
@@ -473,7 +485,7 @@ private void
generateNColStatsEntriesAndValidateMerge(Functions.Function1<Random
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata = new
ArrayList<>();
minMaxValues.forEach(entry -> {
columnRangeMetadata.add(HoodieColumnRangeMetadata.<Comparable>create(fileName,
colName,
- entry.getKey(), entry.getValue(), 5, 1000, 123456, 123456));
+ entry.getKey(), entry.getValue(), 5, 1000, 123456, 123456,
ValueMetadata.V1EmptyMetadata.get()));
Review Comment:
Similar in this class, could the tests be permutated on V2 value metadata
too?
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -598,6 +669,106 @@ public GenericRecord
generateRecordForTripEncodedDecimalSchema(String rowKey, St
return rec;
}
+ public GenericRecord generateRecordForTripLogicalTypesSchema(HoodieKey key,
String riderName, String driverName,
+ long timestamp,
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
+ GenericRecord rec;
+ if (!hasLTS) {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+ } else if (v6) {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
+ } else {
+ rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA);
+ }
+ generateTripPrefixValues(rec, key.getRecordKey(), key.getPartitionPath(),
riderName, driverName, timestamp);
+
+ int hash = key.getRecordKey().hashCode();
+ boolean above = (hash & 1) == 0; // half above, half below threshold
+
+ // -------------------
+ // Threshold definitions
+ // -------------------
+ Instant tsMillisThreshold = Instant.parse("2020-01-01T00:00:00Z");
+ Instant tsMicrosThreshold = Instant.parse("2020-06-01T12:00:00Z");
+
+ //LocalTime timeMillisThreshold = LocalTime.of(12, 0, 0); // noon
+ //LocalTime timeMicrosThreshold = LocalTime.of(6, 0, 0); // 6 AM
+
+ Instant localTsMillisThreshold = ZonedDateTime.of(
+ 2015, 5, 20, 12, 34, 56, 0, ZoneOffset.UTC).toInstant();
+ Instant localTsMicrosThreshold = ZonedDateTime.of(
+ 2017, 7, 7, 7, 7, 7, 0, ZoneOffset.UTC).toInstant();
+
+ LocalDate dateThreshold = LocalDate.of(2000, 1, 1);
+
+ // -------------------
+ // Assign edge values
+ // -------------------
+
+ // ts_millis
+ long tsMillisBase = tsMillisThreshold.toEpochMilli();
+ rec.put("ts_millis", above ? tsMillisBase + 1 : tsMillisBase - 1);
+
+ // ts_micros
+ long tsMicrosBase =
TimeUnit.SECONDS.toMicros(tsMicrosThreshold.getEpochSecond()) +
tsMicrosThreshold.getNano() / 1_000L;
+ rec.put("ts_micros", above ? tsMicrosBase + 1 : tsMicrosBase - 1);
+
+ // time_millis
+ //int timeMillisBase = (int)
TimeUnit.SECONDS.toMillis(timeMillisThreshold.toSecondOfDay());
+ //rec.put("time_millis", above ? timeMillisBase + 1 : timeMillisBase - 1);
+
+ // time_micros
+ //long timeMicrosBase =
TimeUnit.SECONDS.toMicros(timeMicrosThreshold.toSecondOfDay());
+ //rec.put("time_micros", above ? timeMicrosBase + 1 : timeMicrosBase - 1);
Review Comment:
Re-enabling this should work?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -192,9 +195,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
// schema that we want fg reader to output to us
+ val exclusionFields = new java.util.HashSet[String]()
+ exclusionFields.add("op")
+ partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
val requestedSchema = StructType(requiredSchema.fields ++
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
- val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName)
- val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
+ val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema,
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName), exclusionFields)
+ val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema,
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName), exclusionFields)
Review Comment:
Is this change related to new column stats layout?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala:
##########
@@ -110,5 +110,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
// Older Spark 3.x versions do not have TimestampNTZType
- override def isTimestampNTZType(dataType: DataType): Boolean = false
+ override def isTimestampNTZType(dataType: DataType): Boolean = {
+ // TimestampNTZType$ but don't want to rule out possibility of
TimestampNTZType
+ dataType.getClass.getSimpleName.startsWith("TimestampNTZType")
+ }
Review Comment:
This method is overridden in Spark version-specific adapter class. So the
change in this class is not needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]