nsivabalan commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2730139359


##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -471,4 +494,15 @@ public static Schema asNullable(Schema schema) {
         (change, field) -> change.updateColumnNullability(field, true));
     return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
schemaChange), schema.getFullName());
   }
+
+  public static Schema getRepairedSchema(Schema writerSchema, Schema 
readerSchema) {
+    try {
+      Class<?> avroSchemaRepairClass = 
Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
+      Method repairMethod = 
avroSchemaRepairClass.getMethod("repairLogicalTypes", Schema.class, 
Schema.class);
+      return (Schema) repairMethod.invoke(null, writerSchema, readerSchema);

Review Comment:
   isn't the 2nd arg avroSchemaRepairClass?



##########
hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java:
##########
@@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData {
   private static final TimeConversions.TimeMicrosConversion 
TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
   private static final TimeConversions.TimestampMicrosConversion 
TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
 
-  // NOTE: Those are not supported in Avro 1.8.2
-  // TODO re-enable upon upgrading to 1.10
-  // private static final TimeConversions.TimestampMillisConversion 
TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
-  // private static final TimeConversions.TimeMillisConversion 
TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
-  // private static final TimeConversions.LocalTimestampMillisConversion 
LOCAL_TIMESTAMP_MILLIS_CONVERSION = new 
TimeConversions.LocalTimestampMillisConversion();
-  // private static final TimeConversions.LocalTimestampMicrosConversion 
LOCAL_TIMESTAMP_MICROS_CONVERSION = new 
TimeConversions.LocalTimestampMicrosConversion();
-
+  // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
+  // Use reflection to conditionally initialize them only if available
+  private static final Object TIMESTAMP_MILLIS_CONVERSION = 
createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimestampMillisConversion");

Review Comment:
   if the unavailability of these classes or methods is just in spark2.x, why 
do we need reflection. looks like we are removing support for spark 2.x 
anyways. So, why not simplify and call the corresponding classes directly. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow> 
getInternalRowIterator(Schema readerSchema
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
-    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(readerSchema);
-    StructType requestedStructType = 
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
-    storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readerStructType.json());
-    storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
requestedStructType.json());
+
+    MessageType fileSchema = getFileSchema();
+    Schema nonNullSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+    Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
+    Pair<StructType, StructType> readerSchemas =
+        SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage, 
readerSchema, requestedSchema, fileSchema);
+    storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readerSchemas.getLeft().json());
+    storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
readerSchemas.getRight().json());
     storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
     storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
-    ParquetReader<InternalRow> reader = 
ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new 
Path(path.toUri()))
+    ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
+        (ReadSupport) 
SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),

Review Comment:
   looks like we are using default value for `enableTimestampFieldRepair` which 
is true while instantiating `HoodieParquetReadSupport`. 
   We need to fix lines 135 to 139 and set the argument here for 
`enableTimestampFieldRepair` accordingly 



##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala:
##########
@@ -127,4 +143,41 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  override def getParquetReadSupport(messageSchema: 
org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {

Review Comment:
   we should make enableTimestampFieldRepair as an argument.
   curious to know why we don't wire in most other arguments as well? 
   



##########
hudi-client/hudi-client-common/pom.xml:
##########
@@ -172,6 +214,10 @@
           <groupId>org.pentaho</groupId>
           <artifactId>*</artifactId>
         </exclusion>
+        <exclusion>

Review Comment:
   can you  hep me understand the necessity of this code change. 



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -471,4 +494,15 @@ public static Schema asNullable(Schema schema) {
         (change, field) -> change.updateColumnNullability(field, true));
     return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
schemaChange), schema.getFullName());
   }
+
+  public static Schema getRepairedSchema(Schema writerSchema, Schema 
readerSchema) {

Review Comment:
   why is this change required? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow> 
getInternalRowIterator(Schema readerSchema
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
-    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(readerSchema);
-    StructType requestedStructType = 
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
-    storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readerStructType.json());
-    storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
requestedStructType.json());
+
+    MessageType fileSchema = getFileSchema();
+    Schema nonNullSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+    Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));

Review Comment:
   are we not missing the repair call here? 
   
https://github.com/apache/hudi/blob/fd008dae73470721edf378f22c158ef8115ef7ab/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java#L145
 
   
   I understand, we have some divergence b/w 1.x and 0.x in this code blocks. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow> 
getInternalRowIterator(Schema readerSchema
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
-    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(readerSchema);
-    StructType requestedStructType = 
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
-    storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readerStructType.json());
-    storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
requestedStructType.json());
+
+    MessageType fileSchema = getFileSchema();
+    Schema nonNullSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+    Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
+    Pair<StructType, StructType> readerSchemas =
+        SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage, 
readerSchema, requestedSchema, fileSchema);
+    storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readerSchemas.getLeft().json());
+    storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
readerSchemas.getRight().json());
     storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
     storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
-    ParquetReader<InternalRow> reader = 
ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new 
Path(path.toUri()))
+    ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
+        (ReadSupport) 
SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),

Review Comment:
   may be we have this minor wiring issue in 1.x as well. can. you raise a 
patch for it



##########
hudi-client/hudi-client-common/pom.xml:
##########
@@ -52,6 +52,48 @@
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-timeline-service</artifactId>
       <version>${project.version}</version>
+      <!-- Exclude Jetty from timeline-service to use our managed version -->
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>

Review Comment:
   why do we need this change? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -55,13 +58,18 @@
 
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static 
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
+import static 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
 
 public class HoodieSparkParquetReader implements HoodieSparkFileReader {
 
+  public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = 
"spark.hudi.logicalTimestampField.repair.enable";
   private final StoragePath path;
   private final HoodieStorage storage;
   private final FileFormatUtils parquetUtils;
-  private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
+  private final List<ClosableIterator> readerIterators = new ArrayList<>();

Review Comment:
   Does L69 change really required? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to