the-other-tim-brown commented on code in PR #17535:
URL: https://github.com/apache/hudi/pull/17535#discussion_r2609007710
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java:
##########
@@ -100,13 +94,17 @@ private Schema handlePartitionColumnsIfNeeded(Schema
schema) {
public Option<Schema> getTableAvroSchemaIfPresent(boolean
includeMetadataFields, Option<HoodieInstant> instant) {
return getTableAvroSchemaFromTimelineWithCache(instant) // Get table
schema from schema evolution timeline.
+ .map(HoodieSchema::fromAvroSchema)
.or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read
create schema from table config.
- .map(tableSchema -> includeMetadataFields ?
HoodieAvroUtils.addMetadataFields(tableSchema, false) :
HoodieAvroUtils.removeMetadataFields(tableSchema))
- .map(this::handlePartitionColumnsIfNeeded);
+ .map(tableSchema -> includeMetadataFields ?
HoodieSchemaUtils.addMetadataFields(tableSchema, false) :
HoodieSchemaUtils.removeMetadataFields(tableSchema))
+ .map(this::handlePartitionColumnsIfNeeded)
+ .map(HoodieSchema::toAvroSchema);
}
- private Option<Schema> getTableCreateSchemaWithoutMetaField() {
- return metaClient.getTableConfig().getTableCreateSchema();
+ private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
+ return metaClient.getTableConfig().getTableCreateSchema()
+ .map(HoodieSchema::fromAvroSchema)
+ .or(Option.empty());
Review Comment:
We shouldn't need the `or` in this case
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -240,20 +242,32 @@ private ValueWriter makeWriter(Schema avroSchema,
DataType dataType) {
} else if (dataType == DataTypes.LongType || dataType instanceof
DayTimeIntervalType) {
return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
} else if (dataType == DataTypes.TimestampType) {
- if (logicalType == null ||
logicalType.getName().equals(LogicalTypes.timestampMicros().getName())) {
- return (row, ordinal) -> recordConsumer.addLong((long)
timestampRebaseFunction.apply(row.getLong(ordinal)));
- } else if
(logicalType.getName().equals(LogicalTypes.timestampMillis().getName())) {
- return (row, ordinal) ->
recordConsumer.addLong(DateTimeUtils.microsToMillis((long)
timestampRebaseFunction.apply(row.getLong(ordinal))));
+ if (resolvedSchema instanceof HoodieSchema.Timestamp) {
Review Comment:
let's use the `HoodieSchemaType` instead of `instanceof` checks to determine
the type of the field
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java:
##########
@@ -88,10 +82,10 @@ public
ConcurrentSchemaEvolutionTableSchemaGetter(HoodieTableMetaClient metaClie
* @param schema the input schema to process
* @return the processed schema with partition columns handled appropriately
*/
- private Schema handlePartitionColumnsIfNeeded(Schema schema) {
+ private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) {
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
- .map(partitionFields -> appendPartitionColumns(schema,
Option.ofNullable(partitionFields)))
+ .map(partitionFields ->
TableSchemaResolver.appendPartitionColumns(schema,
Option.ofNullable(partitionFields)))
.or(() -> Option.of(schema))
.get();
Review Comment:
Nitpick: while we're updating this, let's change this to use `orElseGet`
instead of `or` and `get`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -240,20 +242,32 @@ private ValueWriter makeWriter(Schema avroSchema,
DataType dataType) {
} else if (dataType == DataTypes.LongType || dataType instanceof
DayTimeIntervalType) {
return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
} else if (dataType == DataTypes.TimestampType) {
- if (logicalType == null ||
logicalType.getName().equals(LogicalTypes.timestampMicros().getName())) {
- return (row, ordinal) -> recordConsumer.addLong((long)
timestampRebaseFunction.apply(row.getLong(ordinal)));
- } else if
(logicalType.getName().equals(LogicalTypes.timestampMillis().getName())) {
- return (row, ordinal) ->
recordConsumer.addLong(DateTimeUtils.microsToMillis((long)
timestampRebaseFunction.apply(row.getLong(ordinal))));
+ if (resolvedSchema instanceof HoodieSchema.Timestamp) {
+ HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp)
resolvedSchema;
+ if (timestampSchema.getPrecision() == TimePrecision.MICROS) {
+ return (row, ordinal) -> recordConsumer.addLong((long)
timestampRebaseFunction.apply(row.getLong(ordinal)));
+ } else {
+ return (row, ordinal) ->
recordConsumer.addLong(DateTimeUtils.microsToMillis((long)
timestampRebaseFunction.apply(row.getLong(ordinal))));
+ }
} else {
- throw new UnsupportedOperationException("Unsupported Avro logical type
for TimestampType: " + logicalType);
+ // Default to micros precision when no timestamp schema is available
+ return (row, ordinal) -> recordConsumer.addLong((long)
timestampRebaseFunction.apply(row.getLong(ordinal)));
}
} else if
(SparkAdapterSupport$.MODULE$.sparkAdapter().isTimestampNTZType(dataType)) {
- if (logicalType == null ||
logicalType.getName().equals(LogicalTypes.localTimestampMicros().getName())) {
- return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal));
- } else if
(logicalType.getName().equals(LogicalTypes.localTimestampMillis().getName())) {
- return (row, ordinal) ->
recordConsumer.addLong(DateTimeUtils.microsToMillis(row.getLong(ordinal)));
+ if (resolvedSchema instanceof HoodieSchema.Timestamp) {
Review Comment:
Similarly here let's use the type
##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java:
##########
@@ -326,43 +331,37 @@ private void validateRewriteWithAvro(
ObjectInspector writableOINew = getWritableOIForType(newTypeInfo);
Object javaInput = ObjectInspectorConverters.getConverter(writableOIOld,
oldObjectInspector).convert(oldWritable);
- if (isDecimalSchema(oldSchema)) {
- javaInput =
HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(getDecimalValue(javaInput,
oldSchema), oldSchema, oldSchema.getLogicalType());
+ if (oldSchema instanceof HoodieSchema.Decimal) {
Review Comment:
Let's use the schema type in this class as well
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +213,38 @@ public static boolean
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
}
return
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(),
schema2.toAvroSchema());
}
+
+ /**
+ * Identifies the writer field that corresponds to the specified reader
field.
+ * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+ *
+ * <p>
+ * Matching includes reader name aliases.
+ * </p>
+ *
+ * @param writerSchema Schema of the record where to look for the writer
field.
+ * @param readerField Reader field to identify the corresponding writer
field
+ * of.
+ * @return the writer field, if any does correspond, or None.
+ */
+ public static HoodieSchemaField lookupWriterField(final HoodieSchema
writerSchema, final HoodieSchemaField readerField) {
+ ValidationUtils.checkArgument(writerSchema.getType() ==
HoodieSchemaType.RECORD, writerSchema + " is not a record");
+ Option<HoodieSchemaField> result = Option.empty();
+ final Option<HoodieSchemaField> directOpt =
writerSchema.getField(readerField.name());
+ if (directOpt.isPresent()) {
Review Comment:
Do we need a separate variable for the `result`? Can we just use `directOpt`
directly?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -118,7 +118,7 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
private final Function1<Object, Object> timestampRebaseFunction =
DataSourceUtils.createTimestampRebaseFuncInWrite(datetimeRebaseMode, "Parquet");
private final boolean writeLegacyListFormat;
private final ValueWriter[] rootFieldWriters;
- private final Schema avroSchema;
+ private final HoodieSchema hoodieSchema;
Review Comment:
nitpick: let's update `hoodieSchema` variable names to simply `schema`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -952,9 +952,10 @@ public void validateSchema() throws HoodieUpsertException,
HoodieInsertException
if (!existingTableSchema.isPresent()) {
return;
}
- Schema writerSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
- Schema tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
- AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema,
shouldValidate, allowProjection, getDropPartitionColNames());
+
+ HoodieSchema writerSchema =
HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), false);
+ HoodieSchema tableSchema =
HoodieSchema.createHoodieWriteSchema(existingTableSchema.get().toString(),
false);
Review Comment:
Instead of converting `existingTableSchema` to a string, let's create a
method that takes in a `HoodieSchema` similar to the `AvroSchemaUtils`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]