the-other-tim-brown commented on code in PR #17535:
URL: https://github.com/apache/hudi/pull/17535#discussion_r2609007710


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java:
##########
@@ -100,13 +94,17 @@ private Schema handlePartitionColumnsIfNeeded(Schema 
schema) {
 
   public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
     return getTableAvroSchemaFromTimelineWithCache(instant) // Get table 
schema from schema evolution timeline.
+        .map(HoodieSchema::fromAvroSchema)
         .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read 
create schema from table config.
-        .map(tableSchema -> includeMetadataFields ? 
HoodieAvroUtils.addMetadataFields(tableSchema, false) : 
HoodieAvroUtils.removeMetadataFields(tableSchema))
-        .map(this::handlePartitionColumnsIfNeeded);
+        .map(tableSchema -> includeMetadataFields ? 
HoodieSchemaUtils.addMetadataFields(tableSchema, false) : 
HoodieSchemaUtils.removeMetadataFields(tableSchema))
+        .map(this::handlePartitionColumnsIfNeeded)
+        .map(HoodieSchema::toAvroSchema);
   }
 
-  private Option<Schema> getTableCreateSchemaWithoutMetaField() {
-    return metaClient.getTableConfig().getTableCreateSchema();
+  private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
+    return metaClient.getTableConfig().getTableCreateSchema()
+        .map(HoodieSchema::fromAvroSchema)
+        .or(Option.empty());

Review Comment:
   We shouldn't need the `or` in this case



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -240,20 +242,32 @@ private ValueWriter makeWriter(Schema avroSchema, 
DataType dataType) {
     } else if (dataType == DataTypes.LongType || dataType instanceof 
DayTimeIntervalType) {
       return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
     } else if (dataType == DataTypes.TimestampType) {
-      if (logicalType == null || 
logicalType.getName().equals(LogicalTypes.timestampMicros().getName())) {
-        return (row, ordinal) -> recordConsumer.addLong((long) 
timestampRebaseFunction.apply(row.getLong(ordinal)));
-      } else if 
(logicalType.getName().equals(LogicalTypes.timestampMillis().getName())) {
-        return (row, ordinal) -> 
recordConsumer.addLong(DateTimeUtils.microsToMillis((long) 
timestampRebaseFunction.apply(row.getLong(ordinal))));
+      if (resolvedSchema instanceof HoodieSchema.Timestamp) {

Review Comment:
   let's use the `HoodieSchemaType` instead of `instanceof` checks to determine 
the type of the field



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java:
##########
@@ -88,10 +82,10 @@ public 
ConcurrentSchemaEvolutionTableSchemaGetter(HoodieTableMetaClient metaClie
    * @param schema the input schema to process
    * @return the processed schema with partition columns handled appropriately
    */
-  private Schema handlePartitionColumnsIfNeeded(Schema schema) {
+  private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) {
     if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
       return metaClient.getTableConfig().getPartitionFields()
-          .map(partitionFields -> appendPartitionColumns(schema, 
Option.ofNullable(partitionFields)))
+          .map(partitionFields -> 
TableSchemaResolver.appendPartitionColumns(schema, 
Option.ofNullable(partitionFields)))
           .or(() -> Option.of(schema))
           .get();

Review Comment:
   Nitpick: while we're updating this, let's change this to use `orElseGet` 
instead of `or` and `get`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -240,20 +242,32 @@ private ValueWriter makeWriter(Schema avroSchema, 
DataType dataType) {
     } else if (dataType == DataTypes.LongType || dataType instanceof 
DayTimeIntervalType) {
       return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
     } else if (dataType == DataTypes.TimestampType) {
-      if (logicalType == null || 
logicalType.getName().equals(LogicalTypes.timestampMicros().getName())) {
-        return (row, ordinal) -> recordConsumer.addLong((long) 
timestampRebaseFunction.apply(row.getLong(ordinal)));
-      } else if 
(logicalType.getName().equals(LogicalTypes.timestampMillis().getName())) {
-        return (row, ordinal) -> 
recordConsumer.addLong(DateTimeUtils.microsToMillis((long) 
timestampRebaseFunction.apply(row.getLong(ordinal))));
+      if (resolvedSchema instanceof HoodieSchema.Timestamp) {
+        HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp) 
resolvedSchema;
+        if (timestampSchema.getPrecision() == TimePrecision.MICROS) {
+          return (row, ordinal) -> recordConsumer.addLong((long) 
timestampRebaseFunction.apply(row.getLong(ordinal)));
+        } else {
+          return (row, ordinal) -> 
recordConsumer.addLong(DateTimeUtils.microsToMillis((long) 
timestampRebaseFunction.apply(row.getLong(ordinal))));
+        }
       } else {
-        throw new UnsupportedOperationException("Unsupported Avro logical type 
for TimestampType: " + logicalType);
+        // Default to micros precision when no timestamp schema is available
+        return (row, ordinal) -> recordConsumer.addLong((long) 
timestampRebaseFunction.apply(row.getLong(ordinal)));
       }
     } else if 
(SparkAdapterSupport$.MODULE$.sparkAdapter().isTimestampNTZType(dataType)) {
-      if (logicalType == null || 
logicalType.getName().equals(LogicalTypes.localTimestampMicros().getName())) {
-        return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
-      } else if 
(logicalType.getName().equals(LogicalTypes.localTimestampMillis().getName())) {
-        return (row, ordinal) -> 
recordConsumer.addLong(DateTimeUtils.microsToMillis(row.getLong(ordinal)));
+      if (resolvedSchema instanceof HoodieSchema.Timestamp) {

Review Comment:
   Similarly here let's use the type



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java:
##########
@@ -326,43 +331,37 @@ private void validateRewriteWithAvro(
     ObjectInspector writableOINew = getWritableOIForType(newTypeInfo);
 
     Object javaInput = ObjectInspectorConverters.getConverter(writableOIOld, 
oldObjectInspector).convert(oldWritable);
-    if (isDecimalSchema(oldSchema)) {
-      javaInput = 
HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(getDecimalValue(javaInput, 
oldSchema), oldSchema, oldSchema.getLogicalType());
+    if (oldSchema instanceof HoodieSchema.Decimal) {

Review Comment:
   Let's use the schema type in this class as well



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +213,38 @@ public static boolean 
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
     }
     return 
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(), 
schema2.toAvroSchema());
   }
+
+  /**
+   * Identifies the writer field that corresponds to the specified reader 
field.
+   * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+   *
+   * <p>
+   * Matching includes reader name aliases.
+   * </p>
+   *
+   * @param writerSchema Schema of the record where to look for the writer 
field.
+   * @param readerField  Reader field to identify the corresponding writer 
field
+   *                     of.
+   * @return the writer field, if any does correspond, or None.
+   */
+  public static HoodieSchemaField lookupWriterField(final HoodieSchema 
writerSchema, final HoodieSchemaField readerField) {
+    ValidationUtils.checkArgument(writerSchema.getType() == 
HoodieSchemaType.RECORD, writerSchema + " is not a record");
+    Option<HoodieSchemaField> result = Option.empty();
+    final Option<HoodieSchemaField> directOpt = 
writerSchema.getField(readerField.name());
+    if (directOpt.isPresent()) {

Review Comment:
   Do we need a separate variable for the `result`? Can we just use `directOpt` 
directly?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -118,7 +118,7 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
   private final Function1<Object, Object> timestampRebaseFunction = 
DataSourceUtils.createTimestampRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
   private final boolean writeLegacyListFormat;
   private final ValueWriter[] rootFieldWriters;
-  private final Schema avroSchema;
+  private final HoodieSchema hoodieSchema;

Review Comment:
   nitpick: let's update `hoodieSchema` variable names to simply `schema`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -952,9 +952,10 @@ public void validateSchema() throws HoodieUpsertException, 
HoodieInsertException
       if (!existingTableSchema.isPresent()) {
         return;
       }
-      Schema writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      Schema tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
-      AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, 
shouldValidate, allowProjection, getDropPartitionColNames());
+
+      HoodieSchema writerSchema = 
HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), false);
+      HoodieSchema tableSchema = 
HoodieSchema.createHoodieWriteSchema(existingTableSchema.get().toString(), 
false);

Review Comment:
   Instead of converting `existingTableSchema` to a string, let's create a 
method that takes in a `HoodieSchema` similar to the `AvroSchemaUtils`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to