This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f044d3d6396b fix(spark): handle Avro 1.12 logical type values in Spark
4.1 read path (#18773)
f044d3d6396b is described below
commit f044d3d6396bdf07ab5ca5408ed1963055e4c33e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 19 17:20:41 2026 -0700
fix(spark): handle Avro 1.12 logical type values in Spark 4.1 read path
(#18773)
---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 145 +++++++++++++++++++--
.../apache/hudi/avro/HoodieAvroWrapperUtils.java | 26 +++-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 144 ++++++++++++++++++++
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 4 -
.../sql/hudi/dml/others/TestMergeIntoTable.scala | 4 +-
.../apache/spark/sql/avro/AvroDeserializer.scala | 22 ++++
.../hudi/TestSpark4_1AvroLogicalTypeBytes.scala | 127 ++++++++++++++++++
7 files changed, 453 insertions(+), 19 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 654eb1660376..7c55f441d200 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -36,6 +36,7 @@ import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
@@ -70,8 +71,11 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
+import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -671,20 +675,42 @@ public class HoodieAvroUtils {
* <p>
* Decimal Data Type is converted to actual decimal value instead of
bytes/fixed which is how it is
* represented/stored in parquet.
+ * <p>
+ * <b>Avro version compatibility:</b> Avro 1.12.1 (pulled in by the Spark
4.1 profile) flipped the
+ * default of {@code GenericData}'s {@code fastReaderEnabled} flag from
{@code false} to
+ * {@code true} (the {@code org.apache.avro.fastread} system property now
defaults to
+ * {@code "true"}). With the fast reader enabled, {@code GenericDatumReader}
applies the
+ * registered {@code Conversion}s during decode and materializes {@link
LocalDate} for date,
+ * {@link Instant} for timestamp-millis / timestamp-micros, and {@link
LocalDateTime} for
+ * local-timestamp-millis / local-timestamp-micros. Avro 1.12.0 (Spark 4.0)
and Avro 1.11.x
+ * (Spark 3.5 and earlier) left the fast reader off by default, so those
profiles exposed the raw
+ * {@link Integer} / {@link Long}. This method normalizes both forms to the
same stable
+ * representation, so callers (precombine/ordering comparison,
partition-path and record-key
+ * derivation, etc.) behave identically across Avro versions and across
writer/reader Spark
+ * version combinations. The on-disk byte format is unaffected by this
normalization: Avro's wire
+ * encoding for these logical types is fixed by spec to int / long and is
identical across Avro
+ * versions; only the in-memory Java type returned by the reader changed.
*
* @param fieldSchema avro field schema
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
public static Object convertValueForAvroLogicalTypes(Schema fieldSchema,
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
- if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
- return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
- } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis()
&& consistentLogicalTimestampEnabled) {
- return new Timestamp(Long.parseLong(fieldValue.toString()));
- } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros()
&& consistentLogicalTimestampEnabled) {
- return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000);
- } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
- Decimal dc = (Decimal) fieldSchema.getLogicalType();
+ LogicalType logicalType = fieldSchema.getLogicalType();
+ if (logicalType == LogicalTypes.date()) {
+ return LocalDate.ofEpochDay(extractEpochDay(fieldValue));
+ } else if (logicalType == LogicalTypes.timestampMillis()) {
+ long millis = extractEpochMillis(fieldValue);
+ return consistentLogicalTimestampEnabled ? new Timestamp(millis) :
millis;
+ } else if (logicalType == LogicalTypes.timestampMicros()) {
+ long micros = extractEpochMicros(fieldValue);
+ return consistentLogicalTimestampEnabled ? new Timestamp(micros / 1000)
: micros;
+ } else if (logicalType == LogicalTypes.localTimestampMillis()) {
+ return extractLocalEpochMillis(fieldValue);
+ } else if (logicalType == LogicalTypes.localTimestampMicros()) {
+ return extractLocalEpochMicros(fieldValue);
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ Decimal dc = (Decimal) logicalType;
DecimalConversion decimalConversion = new DecimalConversion();
if (fieldSchema.getType() == Schema.Type.FIXED) {
return decimalConversion.fromFixed((GenericFixed) fieldValue,
fieldSchema,
@@ -700,6 +726,101 @@ public class HoodieAvroUtils {
return fieldValue;
}
+ // The extract* helpers below accept either the Avro primitive form (Integer
/ Long, returned by
+ // Avro 1.12.0 / 1.11.x) or the java.time form (returned by Avro 1.12.1 with
its default
+ // fastReaderEnabled=true), and return the canonical primitive for the same
underlying bytes.
+ // See the javadoc on convertValueForAvroLogicalTypes for context.
+
+ private static long extractEpochDay(Object fieldValue) {
+ if (fieldValue instanceof LocalDate) {
+ return ((LocalDate) fieldValue).toEpochDay();
+ }
+ if (fieldValue instanceof Number) {
+ return ((Number) fieldValue).longValue();
+ }
+ return Long.parseLong(fieldValue.toString());
+ }
+
+ private static long extractEpochMillis(Object fieldValue) {
+ if (fieldValue instanceof Instant) {
+ return ((Instant) fieldValue).toEpochMilli();
+ }
+ if (fieldValue instanceof Number) {
+ return ((Number) fieldValue).longValue();
+ }
+ return Long.parseLong(fieldValue.toString());
+ }
+
+ private static long extractEpochMicros(Object fieldValue) {
+ if (fieldValue instanceof Instant) {
+ Instant instant = (Instant) fieldValue;
+ return Math.addExact(Math.multiplyExact(instant.getEpochSecond(),
1_000_000L), instant.getNano() / 1000L);
+ }
+ if (fieldValue instanceof Number) {
+ return ((Number) fieldValue).longValue();
+ }
+ return Long.parseLong(fieldValue.toString());
+ }
+
+ private static long extractLocalEpochMillis(Object fieldValue) {
+ if (fieldValue instanceof LocalDateTime) {
+ return ((LocalDateTime)
fieldValue).toInstant(ZoneOffset.UTC).toEpochMilli();
+ }
+ if (fieldValue instanceof Number) {
+ return ((Number) fieldValue).longValue();
+ }
+ return Long.parseLong(fieldValue.toString());
+ }
+
+ private static long extractLocalEpochMicros(Object fieldValue) {
+ if (fieldValue instanceof LocalDateTime) {
+ Instant instant = ((LocalDateTime) fieldValue).toInstant(ZoneOffset.UTC);
+ return Math.addExact(Math.multiplyExact(instant.getEpochSecond(),
1_000_000L), instant.getNano() / 1000L);
+ }
+ if (fieldValue instanceof Number) {
+ return ((Number) fieldValue).longValue();
+ }
+ return Long.parseLong(fieldValue.toString());
+ }
+
+ /**
+ * If {@code schema} carries a date / timestamp logical type and {@code
value} is in the
+ * java.time form ({@link LocalDate} / {@link Instant} / {@link
LocalDateTime}), normalize it
+ * to the Avro primitive form ({@link Integer} for date, {@link Long} for
timestamp-millis,
+ * timestamp-micros, local-timestamp-millis, and local-timestamp-micros).
Used at the entry of
+ * schema-evolution rewrite paths whose legacy code does unguarded {@code
(Integer)} /
+ * {@code (Long)} casts on field values, which would otherwise fail under
Avro 1.12.1 (Spark
+ * 4.1) where the {@code fastReaderEnabled} default is {@code true} and
{@code GenericDatumReader}
+ * materializes java.time types instead of primitives. For non-logical-type
schemas, primitive
+ * inputs, or null, this is a no-op. The on-disk byte format is unaffected -
this is purely
+ * about in-memory Java type.
+ */
+ private static Object normalizeAvroLogicalTypeToPrimitive(Object value,
Schema schema) {
+ if (value == null || schema == null) {
+ return value;
+ }
+ LogicalType lt = schema.getLogicalType();
+ if (lt == null) {
+ return value;
+ }
+ if (lt == LogicalTypes.date()) {
+ return value instanceof LocalDate ? (int) ((LocalDate)
value).toEpochDay() : value;
+ }
+ if (lt == LogicalTypes.timestampMillis()) {
+ return value instanceof Instant ? extractEpochMillis(value) : value;
+ }
+ if (lt == LogicalTypes.timestampMicros()) {
+ return value instanceof Instant ? extractEpochMicros(value) : value;
+ }
+ if (lt == LogicalTypes.localTimestampMillis()) {
+ return value instanceof LocalDateTime ? extractLocalEpochMillis(value) :
value;
+ }
+ if (lt == LogicalTypes.localTimestampMicros()) {
+ return value instanceof LocalDateTime ? extractLocalEpochMicros(value) :
value;
+ }
+ return value;
+ }
+
/**
* Gets record column values into object array.
*
@@ -931,6 +1052,14 @@ public class HoodieAvroUtils {
}
public static Object rewritePrimaryType(Object oldValue, Schema oldSchema,
Schema newSchema) {
+ // Normalize any java.time form (LocalDate / Instant / LocalDateTime) to
the Avro primitive
+ // form (Integer / Long) before doing any numeric / string conversion. The
legacy branches
+ // below explicitly cast to Integer / Long; under Avro 1.12.1 (Spark 4.1
profile) the
+ // fastReaderEnabled default flipped to true and GenericDatumReader
materializes java.time
+ // types for date / timestamp logical fields, which would break those
casts. See
+ // convertValueForAvroLogicalTypes for the broader context — this is a
read-side normalization
+ // only and does not affect on-disk byte format.
+ oldValue = normalizeAvroLogicalTypeToPrimitive(oldValue, oldSchema);
if (oldSchema.getType() == newSchema.getType()) {
switch (oldSchema.getType()) {
case NULL:
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
index 91590a0210d5..d0eaec5f05c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
@@ -206,14 +206,18 @@ public class HoodieAvroWrapperUtils {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
- return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord)
avroValueWrapper).get(0)));
+ // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so
GenericDatumReader returns
+ // java.time.LocalDate for date logical types; Avro 1.12.0 (Spark 4.0)
and earlier return Integer.
+ // Accept both — see HoodieAvroUtils#convertValueForAvroLogicalTypes for
the broader context.
+ return Date.valueOf(LocalDate.ofEpochDay(toEpochDay(((GenericRecord)
avroValueWrapper).get(0))));
} else if
(LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
- return LocalDate.ofEpochDay((Integer) ((GenericRecord)
avroValueWrapper).get(0));
+ return LocalDate.ofEpochDay(toEpochDay(((GenericRecord)
avroValueWrapper).get(0)));
} else if
(TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
- Instant instant = microsToInstant((Long) ((GenericRecord)
avroValueWrapper).get(0));
- return Timestamp.from(instant);
+ // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so
timestamp-micros decodes
+ // to java.time.Instant; Avro 1.12.0 (Spark 4.0) and earlier return Long.
+ return Timestamp.from(toInstantFromMicros(((GenericRecord)
avroValueWrapper).get(0)));
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
@@ -340,4 +344,18 @@ public class HoodieAvroWrapperUtils {
GenericRecord genRec = (GenericRecord) val;
return (Comparable<?>) genRec.get("value");
}
+
+ private static int toEpochDay(Object dateFieldValue) {
+ if (dateFieldValue instanceof LocalDate) {
+ return Math.toIntExact(((LocalDate) dateFieldValue).toEpochDay());
+ }
+ return ((Number) dateFieldValue).intValue();
+ }
+
+ private static Instant toInstantFromMicros(Object timestampMicrosFieldValue)
{
+ if (timestampMicrosFieldValue instanceof Instant) {
+ return (Instant) timestampMicrosFieldValue;
+ }
+ return microsToInstant(((Number) timestampMicrosFieldValue).longValue());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index e24501ec7754..0028b322b99e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -99,7 +99,10 @@ import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
+import java.time.Instant;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -499,6 +502,147 @@ public class TestHoodieAvroUtils {
assertEquals(0, buffer.position());
}
+ // Cross-Avro-version test fixtures: the same date / timestamp value
expressed in both the Avro
+ // primitive form (Integer / Long) — what GenericDatumReader returns under
Avro 1.12.0 / 1.11.x —
+ // and the java.time form (LocalDate / Instant / LocalDateTime) — what it
returns under Avro
+ // 1.12.1 with its default fastReaderEnabled=true. The contract under test
is that both forms
+ // produce identical downstream behavior in every Hudi read-side path that
touches a logical-typed
+ // field. Sharing these fixtures across tests pins the same exact value
through every path.
+ private static final long FIXTURE_EPOCH_MICROS = 1716163200_000000L +
123456L; // 2024-05-20T00:00:00.123456Z
+ private static final long FIXTURE_EPOCH_MILLIS = 1716163200_000L + 123L;
// 2024-05-20T00:00:00.123Z
+ private static final int FIXTURE_EPOCH_DAY = (int) LocalDate.of(2024, 5,
20).toEpochDay();
+ private static final Instant FIXTURE_MILLIS_INSTANT =
Instant.ofEpochMilli(FIXTURE_EPOCH_MILLIS);
+ private static final Instant FIXTURE_MICROS_INSTANT = Instant.ofEpochSecond(
+ FIXTURE_EPOCH_MICROS / 1_000_000L, (FIXTURE_EPOCH_MICROS % 1_000_000L) *
1000L);
+ private static final LocalDate FIXTURE_LOCAL_DATE =
LocalDate.ofEpochDay(FIXTURE_EPOCH_DAY);
+ private static final LocalDateTime FIXTURE_LOCAL_DT_MILLIS =
+ LocalDateTime.ofInstant(FIXTURE_MILLIS_INSTANT, ZoneOffset.UTC);
+ private static final LocalDateTime FIXTURE_LOCAL_DT_MICROS =
+ LocalDateTime.ofInstant(FIXTURE_MICROS_INSTANT, ZoneOffset.UTC);
+
+ private static final Schema DATE_SCHEMA =
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ private static final Schema TS_MILLIS_SCHEMA =
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ private static final Schema TS_MICROS_SCHEMA =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+ private static final Schema LOCAL_TS_MILLIS_SCHEMA =
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ private static final Schema LOCAL_TS_MICROS_SCHEMA =
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
+ /**
+ * Cross-Avro-version invariant: {@link
HoodieAvroUtils#convertValueForAvroLogicalTypes} must produce
+ * the same canonical Java value whether the GenericRecord field holds the
Avro primitive form
+ * ({@code Long}/{@code Integer}, returned by Avro 1.12.0 and 1.11.x) or the
java.time form
+ * ({@code Instant}/{@code LocalDate}/{@code LocalDateTime}, returned by
Avro 1.12.1 with its
+ * default {@code fastReaderEnabled=true}). This is the contract that lets a
Spark 4.1 reader
+ * compare ordering values against records written by any earlier Spark
profile without divergence.
+ */
+ @Test
+ public void testConvertValueForAvroLogicalTypesCrossAvroVersion() {
+ // date
+ assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA,
FIXTURE_EPOCH_DAY, false),
+ HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA,
FIXTURE_LOCAL_DATE, false));
+
+ // timestamp-millis, consistent=false → epoch-millis Long
+ assertEquals(FIXTURE_EPOCH_MILLIS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA,
FIXTURE_EPOCH_MILLIS, false));
+ assertEquals(FIXTURE_EPOCH_MILLIS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA,
FIXTURE_MILLIS_INSTANT, false));
+
+ // timestamp-millis, consistent=true → java.sql.Timestamp
+
assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA,
FIXTURE_EPOCH_MILLIS, true),
+ HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA,
FIXTURE_MILLIS_INSTANT, true));
+
+ // timestamp-micros, consistent=false → epoch-micros Long
+ assertEquals(FIXTURE_EPOCH_MICROS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA,
FIXTURE_EPOCH_MICROS, false));
+ assertEquals(FIXTURE_EPOCH_MICROS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA,
FIXTURE_MICROS_INSTANT, false));
+
+ // timestamp-micros, consistent=true → java.sql.Timestamp (millis
precision, matches Avro 1.11 behavior)
+
assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA,
FIXTURE_EPOCH_MICROS, true),
+ HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA,
FIXTURE_MICROS_INSTANT, true));
+
+ // local-timestamp-millis / local-timestamp-micros → Long
+ assertEquals(FIXTURE_EPOCH_MILLIS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA,
FIXTURE_EPOCH_MILLIS, false));
+ assertEquals(FIXTURE_EPOCH_MILLIS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA,
FIXTURE_LOCAL_DT_MILLIS, false));
+ assertEquals(FIXTURE_EPOCH_MICROS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA,
FIXTURE_EPOCH_MICROS, false));
+ assertEquals(FIXTURE_EPOCH_MICROS,
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA,
FIXTURE_LOCAL_DT_MICROS, false));
+ }
+
+ /**
+ * Cross-Avro-version invariant for ordering-value extraction: a record
whose timestamp/date field
+ * holds the java.time form (Avro 1.12.1 fast reader) must yield the same
comparable ordering value
+ * as one holding the primitive form (Avro 1.12.0 / 1.11.x). Without this
property,
+ * {@code DefaultHoodieRecordPayload.compareOrderingVal} throws
ClassCastException when one side of
+ * the comparison was read via Avro 1.12.1 and the other built from a Long
(which is what Hudi's
+ * Spark→Avro serializer always produces).
+ */
+ @Test
+ public void testGetNestedFieldValOrderingInvariantAcrossAvroVersions() {
+ String schemaStr = "{\"type\":\"record\",\"name\":\"r\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"string\"},"
+ +
"{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
+ +
"{\"name\":\"d\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}]}";
+ Schema schema = new Schema.Parser().parse(schemaStr);
+
+ GenericRecord avro111Form = new GenericData.Record(schema);
+ avro111Form.put("id", "k");
+ avro111Form.put("ts", FIXTURE_EPOCH_MICROS);
+ avro111Form.put("d", FIXTURE_EPOCH_DAY);
+
+ GenericRecord avro112Form = new GenericData.Record(schema);
+ avro112Form.put("id", "k");
+ avro112Form.put("ts", FIXTURE_MICROS_INSTANT);
+ avro112Form.put("d", FIXTURE_LOCAL_DATE);
+
+ Object ts111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "ts", true,
false);
+ Object ts112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "ts", true,
false);
+ assertEquals(ts111, ts112);
+ // ordering compareTo must be symmetric and produce 0 — this is exactly
what
+ // DefaultHoodieRecordPayload.compareOrderingVal relies on.
+ assertEquals(0, ((Comparable<Object>) ts111).compareTo(ts112));
+ assertEquals(0, ((Comparable<Object>) ts112).compareTo(ts111));
+
+ Object d111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "d", true,
false);
+ Object d112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "d", true,
false);
+ assertEquals(d111, d112);
+ assertEquals(0, ((Comparable<Object>) d111).compareTo(d112));
+ }
+
+ /**
+ * Cross-Avro-version invariant for {@link
HoodieAvroUtils#rewritePrimaryType}: schema-evolution
+ * paths that legacy-cast {@code (Integer) oldValue} / {@code (Long)
oldValue} (e.g. ALTER COLUMN
+ * TYPE from date → string, or timestamp-millis → timestamp-micros) must
accept the java.time
+ * form (Avro 1.12.1 fast reader) as well as the primitive form (Avro 1.12.0
/ 1.11.x), since the
+ * on-disk byte format is identical and a Spark 4.1 reader can be evolving
records written by any
+ * version.
+ */
+ @Test
+ public void testRewritePrimaryTypeCrossAvroVersion() {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema floatSchema = Schema.create(Schema.Type.FLOAT);
+ Schema doubleSchema = Schema.create(Schema.Type.DOUBLE);
+
+ // For each (oldSchema, target newSchema), both the primitive and
java.time inputs must yield the
+ // same rewritten value — same on-disk semantics, no divergence between
Spark profiles.
+ assertRewriteEquivalent(DATE_SCHEMA, stringSchema, FIXTURE_EPOCH_DAY,
FIXTURE_LOCAL_DATE);
+ assertRewriteEquivalent(DATE_SCHEMA, longSchema, FIXTURE_EPOCH_DAY,
FIXTURE_LOCAL_DATE);
+ assertRewriteEquivalent(DATE_SCHEMA, floatSchema, FIXTURE_EPOCH_DAY,
FIXTURE_LOCAL_DATE);
+ assertRewriteEquivalent(DATE_SCHEMA, doubleSchema, FIXTURE_EPOCH_DAY,
FIXTURE_LOCAL_DATE);
+
+ assertRewriteEquivalent(TS_MILLIS_SCHEMA, stringSchema,
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+ assertRewriteEquivalent(TS_MILLIS_SCHEMA, floatSchema,
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+ assertRewriteEquivalent(TS_MILLIS_SCHEMA, doubleSchema,
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+
+ // In-place logical-type changes: the LONG → LONG branches in
rewritePrimaryType explicitly cast
+ // to (Long) and would fail on Instant / LocalDateTime under Avro 1.12.1.
+ assertRewriteEquivalent(TS_MILLIS_SCHEMA, TS_MICROS_SCHEMA,
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+ assertRewriteEquivalent(TS_MICROS_SCHEMA, TS_MILLIS_SCHEMA,
FIXTURE_EPOCH_MICROS, FIXTURE_MICROS_INSTANT);
+ assertRewriteEquivalent(LOCAL_TS_MILLIS_SCHEMA, LOCAL_TS_MICROS_SCHEMA,
FIXTURE_EPOCH_MILLIS, FIXTURE_LOCAL_DT_MILLIS);
+ assertRewriteEquivalent(LOCAL_TS_MICROS_SCHEMA, LOCAL_TS_MILLIS_SCHEMA,
FIXTURE_EPOCH_MICROS, FIXTURE_LOCAL_DT_MICROS);
+ }
+
+ private static void assertRewriteEquivalent(Schema oldSchema, Schema
newSchema,
+ Object avro111Value, Object
avro112Value) {
+ assertEquals(HoodieAvroUtils.rewritePrimaryType(avro111Value, oldSchema,
newSchema),
+ HoodieAvroUtils.rewritePrimaryType(avro112Value, oldSchema,
newSchema));
+ }
+
@Test
public void testReWriteAvroRecordWithNewSchema() {
Schema nestedSchema = new
Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 39c71016ceee..a03a098b2d9d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -75,8 +75,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
test("Test alter column types") {
- // TODO: Fix reading ordering field with logical type on Spark 4.1
(https://github.com/apache/hudi/issues/18606)
- assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see
HUDI-18606")
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
@@ -148,8 +146,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
test("Test alter column types 2") {
- // TODO: Fix reading ordering field with logical type on Spark 4.1
(https://github.com/apache/hudi/issues/18606)
- assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see
HUDI-18606")
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index b82b95af8662..16a7da260bcf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.hudi.dml.others
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieSparkUtils, ScalaAssertionSupport}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
ScalaAssertionSupport}
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.table.timeline.HoodieTimeline
@@ -1089,8 +1089,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test Different Type of PreCombineField") {
- // TODO: Fix reading ordering field with logical type on Spark 4.1
(https://github.com/apache/hudi/issues/18606)
- assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see
HUDI-18606")
withTempDir { tmp =>
withSQLConf("hoodie.payload.combined.schema.validate" -> "true") {
val typeAndValue = Seq(
diff --git
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9ae690f1512e..c3c75c94dd72 100644
---
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -129,6 +129,26 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+
//////////////////////////////////////////////////////////////////////////////////////////////
+ // BEGIN Hudi customization for Avro 1.12.1 (Spark 4.1)
+ //
+ // The Spark 4.1 profile pulls in Avro 1.12.1, which flipped the default
of
+ // `GenericData`'s `fastReaderEnabled` flag from false to true (the
+ // `org.apache.avro.fastread` system property now defaults to "true").
With the fast reader
+ // enabled, `GenericDatumReader` applies the registered `Conversion`s
and materializes
+ // `java.time.LocalDate` for date, `java.time.Instant` for
timestamp-millis /
+ // timestamp-micros, and `java.time.LocalDateTime` for
local-timestamp-millis /
+ // local-timestamp-micros. Avro 1.12.0 (Spark 4.0) and Avro 1.11.x
(Spark 3.5 and earlier)
+ // left the fast reader off, so those profiles exposed the raw `Integer`
/ `Long`. The
+ // blanket `value.asInstanceOf[Long]` / `asInstanceOf[Int]` used by the
upstream Spark 4.0
+ // deserializer fails on the java.time forms. The fallbacks below accept
either form and
+ // normalize to Catalyst's epoch-micros Long / epoch-day Int.
+ //
+ // This change is read-side only — the on-wire encoding for these
logical types is fixed by
+ // the Avro spec (int / long) and is identical across Avro versions, so
storage bytes are
+ // unaffected and writer/reader compatibility across Spark profiles is
preserved.
+
//////////////////////////////////////////////////////////////////////////////////////////////
+
case (INT, IntegerType) => (updater, ordinal, value) =>
value match {
case localDate: java.time.LocalDate =>
@@ -196,6 +216,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type
${TimestampNTZType.sql}.")
}
+ // END Hudi customization for Avro 1.12.1 (Spark 4.1)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Before we upgrade Avro to 1.8 for logical type support, spark-avro
converts Long to Date.
// For backward compatibility, we still keep this conversion.
diff --git
a/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
new file mode 100644
index 000000000000..afb3492f0c2a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
+import org.apache.avro.io.EncoderFactory
+import org.apache.spark.sql.avro.HoodieSpark4_1AvroSerializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.io.ByteArrayOutputStream
+import java.time.LocalDate
+
+/**
+ * Validates the storage-byte invariant under the Spark 4.1 profile, which
pulls in Avro 1.12.1.
+ *
+ * The Avro on-wire encoding for `date`, `timestamp-millis`, and
`timestamp-micros` logical types
+ * is fixed by spec to int / long, and is identical across all Avro versions
Hudi supports
+ * (1.11.x, 1.12.0, 1.12.1). As long as Hudi's Spark→Avro path
(`HoodieSpark4_1AvroSerializer` →
+ * `AvroSerializer`) emits raw `java.lang.Long` / `java.lang.Integer` into the
`GenericRecord` —
+ * and never `java.time.Instant` / `java.time.LocalDate` — the bytes Hudi
writes on Spark 4.1 are
+ * bit-identical to what it writes on Spark 3.5 / 4.0, preserving
forward/backward compatibility
+ * for readers across profiles.
+ *
+ * This test pins that invariant down:
+ * 1. The serializer outputs raw primitives in the GenericRecord (not
java.time types).
+ * 2. `GenericDatumWriter` produces the expected canonical bytes for known
values.
+ */
+class TestSpark4_1AvroLogicalTypeBytes {
+
+ // 2024-05-20T00:00:00.123456Z
+ private val epochMicros: Long = 1716163200_000000L + 123456L
+ private val epochMillis: Long = 1716163200_000L + 123L
+ private val epochDay: Int = LocalDate.of(2024, 5, 20).toEpochDay.toInt
+
+ private val avroSchema: Schema = new Schema.Parser().parse(
+ """{"type":"record","name":"r","fields":[
+ |{"name":"d","type":{"type":"int","logicalType":"date"}},
+
|{"name":"ts_millis","type":{"type":"long","logicalType":"timestamp-millis"}},
+
|{"name":"ts_micros","type":{"type":"long","logicalType":"timestamp-micros"}}
+ |]}""".stripMargin)
+
+ private val sparkSchema: StructType = StructType(Seq(
+ StructField("d", DataTypes.DateType, nullable = false),
+ StructField("ts_millis", DataTypes.TimestampType, nullable = false),
+ StructField("ts_micros", DataTypes.TimestampType, nullable = false)
+ ))
+
+ // Catalyst representation: epoch-day (Int) for DateType, epoch-micros
(Long) for TimestampType.
+ // ts_millis is also passed as micros in catalyst — AvroSerializer converts
to millis on write.
+ private def serializeFixtureRow(): GenericRecord = {
+ val serializer = new HoodieSpark4_1AvroSerializer(sparkSchema, avroSchema,
nullable = false)
+ serializer.serialize(InternalRow(epochDay, epochMillis * 1000L,
epochMicros)).asInstanceOf[GenericRecord]
+ }
+
+ @Test
+ def testSerializerEmitsPrimitivesNotJavaTime(): Unit = {
+ val record = serializeFixtureRow()
+
+ // The on-disk byte stability for cross-Spark-version compatibility hinges
on the GenericRecord
+ // holding the Avro primitive form (Integer / Long), not the java.time
form. If the serializer
+ // ever started emitting Instants/LocalDates, Avro 1.12.1's
GenericDatumWriter would route the
+ // value through a Conversion before encoding. Even though the resulting
bytes would still be
+ // spec-compliant, that path is harder to reason about, so we pin the
primitive contract here.
+ assertTrue(record.get("d").isInstanceOf[java.lang.Integer],
+ s"expected raw Integer for date logical type, got
${record.get("d").getClass}")
+ assertTrue(record.get("ts_millis").isInstanceOf[java.lang.Long],
+ s"expected raw Long for timestamp-millis logical type, got
${record.get("ts_millis").getClass}")
+ assertTrue(record.get("ts_micros").isInstanceOf[java.lang.Long],
+ s"expected raw Long for timestamp-micros logical type, got
${record.get("ts_micros").getClass}")
+
+ assertEquals(epochDay, record.get("d"))
+ assertEquals(epochMillis, record.get("ts_millis"))
+ assertEquals(epochMicros, record.get("ts_micros"))
+ }
+
+ @Test
+ def testEncodedBytesMatchAvroSpec(): Unit = {
+ val baos = new ByteArrayOutputStream()
+ val encoder = EncoderFactory.get().binaryEncoder(baos, null)
+ val writer = new GenericDatumWriter[GenericRecord](avroSchema)
+ writer.write(serializeFixtureRow(), encoder)
+ encoder.flush()
+ val bytes = baos.toByteArray
+
+ // Compute the expected canonical bytes manually using Avro's zig-zag
varlong encoding (spec).
+ // This computation is independent of the running Avro version and proves
that whatever bytes
+ // Avro 1.12.1 writes here equal what Avro 1.12.0 / 1.11.x would write.
+ val expected = new ByteArrayOutputStream()
+ writeZigZagLong(expected, epochDay.toLong)
+ writeZigZagLong(expected, epochMillis)
+ writeZigZagLong(expected, epochMicros)
+ assertEquals(expected.toByteArray.toSeq, bytes.toSeq,
+ "encoded bytes must match the spec-defined Avro encoding regardless of
the Avro version on the classpath")
+ }
+
+ // Avro's variable-length zig-zag long encoding, per the Avro spec — used
here as an independent
+ // reference so the test does not depend on any Avro classes for the
expected-bytes computation.
+ private def writeZigZagLong(out: ByteArrayOutputStream, value: Long): Unit =
{
+ var n = (value << 1) ^ (value >> 63)
+ while ((n & ~0x7FL) != 0) {
+ out.write(((n & 0x7F) | 0x80).toInt)
+ n >>>= 7
+ }
+ out.write(n.toInt)
+ }
+}