nsivabalan commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2730139359
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -471,4 +494,15 @@ public static Schema asNullable(Schema schema) {
(change, field) -> change.updateColumnNullability(field, true));
return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema,
schemaChange), schema.getFullName());
}
+
+ public static Schema getRepairedSchema(Schema writerSchema, Schema
readerSchema) {
+ try {
+ Class<?> avroSchemaRepairClass =
Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
+ Method repairMethod =
avroSchemaRepairClass.getMethod("repairLogicalTypes", Schema.class,
Schema.class);
+ return (Schema) repairMethod.invoke(null, writerSchema, readerSchema);
Review Comment:
isn't the 2nd arg avroSchemaRepairClass?
##########
hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java:
##########
@@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData {
private static final TimeConversions.TimeMicrosConversion
TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
private static final TimeConversions.TimestampMicrosConversion
TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
- // NOTE: Those are not supported in Avro 1.8.2
- // TODO re-enable upon upgrading to 1.10
- // private static final TimeConversions.TimestampMillisConversion
TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
- // private static final TimeConversions.TimeMillisConversion
TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
- // private static final TimeConversions.LocalTimestampMillisConversion
LOCAL_TIMESTAMP_MILLIS_CONVERSION = new
TimeConversions.LocalTimestampMillisConversion();
- // private static final TimeConversions.LocalTimestampMicrosConversion
LOCAL_TIMESTAMP_MICROS_CONVERSION = new
TimeConversions.LocalTimestampMicrosConversion();
-
+ // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
+ // Use reflection to conditionally initialize them only if available
+ private static final Object TIMESTAMP_MILLIS_CONVERSION =
createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimestampMillisConversion");
Review Comment:
if the unavailability of these classes or methods is just in spark2.x, why
do we need reflection. looks like we are removing support for spark 2.x
anyways. So, why not simplify and call the corresponding classes directly.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
- StructType readerStructType =
HoodieInternalRowUtils.getCachedSchema(readerSchema);
- StructType requestedStructType =
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
- storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readerStructType.json());
- storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
requestedStructType.json());
+
+ MessageType fileSchema = getFileSchema();
+ Schema nonNullSchema =
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+ Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
+ Pair<StructType, StructType> readerSchemas =
+ SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage,
readerSchema, requestedSchema, fileSchema);
+ storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readerSchemas.getLeft().json());
+ storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
readerSchemas.getRight().json());
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
- ParquetReader<InternalRow> reader =
ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new
Path(path.toUri()))
+ ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
+ (ReadSupport)
SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
Review Comment:
looks like we are using default value for `enableTimestampFieldRepair` which
is true while instantiating `HoodieParquetReadSupport`.
We need to fix lines 135 to 139 and set the argument here for
`enableTimestampFieldRepair` accordingly
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala:
##########
@@ -127,4 +143,41 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ override def getParquetReadSupport(messageSchema:
org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
Review Comment:
we should make enableTimestampFieldRepair as an argument.
curious to know why we don't wire in most other arguments as well?
##########
hudi-client/hudi-client-common/pom.xml:
##########
@@ -172,6 +214,10 @@
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
Review Comment:
can you hep me understand the necessity of this code change.
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -471,4 +494,15 @@ public static Schema asNullable(Schema schema) {
(change, field) -> change.updateColumnNullability(field, true));
return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema,
schemaChange), schema.getFullName());
}
+
+ public static Schema getRepairedSchema(Schema writerSchema, Schema
readerSchema) {
Review Comment:
why is this change required?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
- StructType readerStructType =
HoodieInternalRowUtils.getCachedSchema(readerSchema);
- StructType requestedStructType =
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
- storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readerStructType.json());
- storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
requestedStructType.json());
+
+ MessageType fileSchema = getFileSchema();
+ Schema nonNullSchema =
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+ Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
Review Comment:
are we not missing the repair call here?
https://github.com/apache/hudi/blob/fd008dae73470721edf378f22c158ef8115ef7ab/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java#L145
I understand, we have some divergence b/w 1.x and 0.x in this code blocks.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -121,35 +129,64 @@ private ClosableIterator<InternalRow>
getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
- StructType readerStructType =
HoodieInternalRowUtils.getCachedSchema(readerSchema);
- StructType requestedStructType =
HoodieInternalRowUtils.getCachedSchema(requestedSchema);
- storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readerStructType.json());
- storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
requestedStructType.json());
+
+ MessageType fileSchema = getFileSchema();
+ Schema nonNullSchema =
AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
+ Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
+ Pair<StructType, StructType> readerSchemas =
+ SparkAdapterSupport$.MODULE$.sparkAdapter().getReaderSchemas(storage,
readerSchema, requestedSchema, fileSchema);
+ storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readerSchemas.getLeft().json());
+ storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(),
readerSchemas.getRight().json());
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
- ParquetReader<InternalRow> reader =
ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new
Path(path.toUri()))
+ ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
+ (ReadSupport)
SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
Review Comment:
may be we have this minor wiring issue in 1.x as well. can. you raise a
patch for it
##########
hudi-client/hudi-client-common/pom.xml:
##########
@@ -52,6 +52,48 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
+ <!-- Exclude Jetty from timeline-service to use our managed version -->
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
Review Comment:
why do we need this change?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -55,13 +58,18 @@
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
+import static
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
+ public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR =
"spark.hudi.logicalTimestampField.repair.enable";
private final StoragePath path;
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
- private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
+ private final List<ClosableIterator> readerIterators = new ArrayList<>();
Review Comment:
Does L69 change really required?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]