yihua commented on code in PR #13711:
URL: https://github.com/apache/hudi/pull/13711#discussion_r2353900069


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -3223,7 +3224,7 @@ public void testColStatsMultipleColumns() throws 
Exception {
         Map<String, HoodieColumnRangeMetadata<Comparable>> parquetStatsMap =
             HoodieIOFactory.getIOFactory(metaClient.getStorage())
                 .getFileFormatUtils(HoodieFileFormat.PARQUET)
-                .readColumnStatsFromMetadata(metaClient.getStorage(), 
fullFilePath, columns)
+                .readColumnStatsFromMetadata(metaClient.getStorage(), 
fullFilePath, columns, V1)

Review Comment:
   Do you think we can parameterize this test on write table versions? Also 
supposedly this test should use V2 column stats index by default?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala:
##########
@@ -72,10 +72,6 @@ abstract class 
SparkParquetReaderBase(enableVectorizedReader: Boolean,
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, true)
     // Using string value of this conf to preserve compatibility across spark 
versions.
     conf.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, false)
-    if (HoodieSparkUtils.gteqSpark3_4) {
-      // PARQUET_INFER_TIMESTAMP_NTZ_ENABLED is required from Spark 3.4.0 or 
above
-      conf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false)
-    }

Review Comment:
   Is this still needed?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -598,6 +669,106 @@ public GenericRecord 
generateRecordForTripEncodedDecimalSchema(String rowKey, St
     return rec;
   }
 
+  public GenericRecord generateRecordForTripLogicalTypesSchema(HoodieKey key, 
String riderName, String driverName,
+                                                               long timestamp, 
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
+    GenericRecord rec;
+    if (!hasLTS) {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+    } else if (v6) {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
+    } else {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA);
+    }
+    generateTripPrefixValues(rec, key.getRecordKey(), key.getPartitionPath(), 
riderName, driverName, timestamp);
+
+    int hash = key.getRecordKey().hashCode();
+    boolean above = (hash & 1) == 0; // half above, half below threshold
+
+    // -------------------
+    // Threshold definitions
+    // -------------------
+    Instant tsMillisThreshold = Instant.parse("2020-01-01T00:00:00Z");
+    Instant tsMicrosThreshold = Instant.parse("2020-06-01T12:00:00Z");
+
+    //LocalTime timeMillisThreshold = LocalTime.of(12, 0, 0);  // noon
+    //LocalTime timeMicrosThreshold = LocalTime.of(6, 0, 0);   // 6 AM

Review Comment:
   Is this still needed



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -192,9 +195,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
 
     // schema that we want fg reader to output to us
+    val exclusionFields = new java.util.HashSet[String]()
+    exclusionFields.add("op")
+    partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
     val requestedSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
-    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName)
-    val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
+    val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName), exclusionFields)
+    val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName), exclusionFields)

Review Comment:
   What does the "op" field correspond to?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java:
##########
@@ -473,7 +485,7 @@ private void 
generateNColStatsEntriesAndValidateMerge(Functions.Function1<Random
     List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata = new 
ArrayList<>();
     minMaxValues.forEach(entry -> {
       
columnRangeMetadata.add(HoodieColumnRangeMetadata.<Comparable>create(fileName, 
colName,
-          entry.getKey(), entry.getValue(), 5, 1000, 123456, 123456));
+          entry.getKey(), entry.getValue(), 5, 1000, 123456, 123456, 
ValueMetadata.V1EmptyMetadata.get()));

Review Comment:
   Similar in this class, could the tests be permutated on V2 value metadata 
too?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -598,6 +669,106 @@ public GenericRecord 
generateRecordForTripEncodedDecimalSchema(String rowKey, St
     return rec;
   }
 
+  public GenericRecord generateRecordForTripLogicalTypesSchema(HoodieKey key, 
String riderName, String driverName,
+                                                               long timestamp, 
boolean isDeleteRecord, boolean v6, boolean hasLTS) {
+    GenericRecord rec;
+    if (!hasLTS) {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
+    } else if (v6) {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
+    } else {
+      rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA);
+    }
+    generateTripPrefixValues(rec, key.getRecordKey(), key.getPartitionPath(), 
riderName, driverName, timestamp);
+
+    int hash = key.getRecordKey().hashCode();
+    boolean above = (hash & 1) == 0; // half above, half below threshold
+
+    // -------------------
+    // Threshold definitions
+    // -------------------
+    Instant tsMillisThreshold = Instant.parse("2020-01-01T00:00:00Z");
+    Instant tsMicrosThreshold = Instant.parse("2020-06-01T12:00:00Z");
+
+    //LocalTime timeMillisThreshold = LocalTime.of(12, 0, 0);  // noon
+    //LocalTime timeMicrosThreshold = LocalTime.of(6, 0, 0);   // 6 AM
+
+    Instant localTsMillisThreshold = ZonedDateTime.of(
+        2015, 5, 20, 12, 34, 56, 0, ZoneOffset.UTC).toInstant();
+    Instant localTsMicrosThreshold = ZonedDateTime.of(
+        2017, 7, 7, 7, 7, 7, 0, ZoneOffset.UTC).toInstant();
+
+    LocalDate dateThreshold = LocalDate.of(2000, 1, 1);
+
+    // -------------------
+    // Assign edge values
+    // -------------------
+
+    // ts_millis
+    long tsMillisBase = tsMillisThreshold.toEpochMilli();
+    rec.put("ts_millis", above ? tsMillisBase + 1 : tsMillisBase - 1);
+
+    // ts_micros
+    long tsMicrosBase = 
TimeUnit.SECONDS.toMicros(tsMicrosThreshold.getEpochSecond()) + 
tsMicrosThreshold.getNano() / 1_000L;
+    rec.put("ts_micros", above ? tsMicrosBase + 1 : tsMicrosBase - 1);
+
+    // time_millis
+    //int timeMillisBase = (int) 
TimeUnit.SECONDS.toMillis(timeMillisThreshold.toSecondOfDay());
+    //rec.put("time_millis", above ? timeMillisBase + 1 : timeMillisBase - 1);
+
+    // time_micros
+    //long timeMicrosBase = 
TimeUnit.SECONDS.toMicros(timeMicrosThreshold.toSecondOfDay());
+    //rec.put("time_micros", above ? timeMicrosBase + 1 : timeMicrosBase - 1);

Review Comment:
   Re-enabling this should work?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -192,9 +195,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
 
     // schema that we want fg reader to output to us
+    val exclusionFields = new java.util.HashSet[String]()
+    exclusionFields.add("op")
+    partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
     val requestedSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
-    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName)
-    val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
+    val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName), exclusionFields)
+    val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName), exclusionFields)

Review Comment:
   Is this change related to new column stats layout?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala:
##########
@@ -110,5 +110,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
   def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
 
   // Older Spark 3.x versions do not have TimestampNTZType
-  override def isTimestampNTZType(dataType: DataType): Boolean = false
+  override def isTimestampNTZType(dataType: DataType): Boolean = {
+    // TimestampNTZType$ but don't want to rule out possibility of 
TimestampNTZType
+    dataType.getClass.getSimpleName.startsWith("TimestampNTZType")
+  }

Review Comment:
   This method is overridden in Spark version-specific adapter class.  So the 
change in this class is not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to