This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 582bb452985 [Java SDK] Infer Beam logical types for JSR-310 and UUID 
fields (#38194)
582bb452985 is described below

commit 582bb45298574573f7bb237671dbeac38d5ba729
Author: Sachin Ranjalkar <[email protected]>
AuthorDate: Wed May 20 22:23:54 2026 +0530

    [Java SDK] Infer Beam logical types for JSR-310 and UUID fields (#38194)
    
    Add schema inference for java.time.LocalDate, LocalTime,
    LocalDateTime, Instant, and UUID as Beam logical types (SqlTypes.DATE,
    TIME, DATETIME, NanosInstant, SqlTypes.UUID) in both POJO and JavaBean
    schemas.
    
    This enables Beam Rows produced from POJOs with JSR-310 fields to be
    schema-assignable to rows from external systems (e.g. IcebergIO) that
    use the same logical types, fixing the incompatibility reported in
    #37524.
    
    The Avro extension overrides the new convertLogicalType hook to
    preserve existing Joda bridging for java.time.Instant and LocalDate.
---
 .../beam/sdk/schemas/utils/ByteBuddyUtils.java     | 128 +++++++++++++++++++++
 .../sdk/schemas/utils/StaticSchemaInference.java   |  19 ++-
 .../beam/sdk/schemas/JavaBeanSchemaTest.java       |  83 +++++++++++++
 .../beam/sdk/schemas/JavaFieldSchemaTest.java      | 122 ++++++++++++++++++++
 .../beam/sdk/schemas/transforms/ConvertTest.java   |  99 ++++++++++++++++
 .../beam/sdk/schemas/utils/JavaBeanUtilsTest.java  |  10 ++
 .../beam/sdk/schemas/utils/POJOUtilsTest.java      |  32 ++++++
 .../beam/sdk/schemas/utils/TestJavaBeans.java      |  89 ++++++++++++++
 .../apache/beam/sdk/schemas/utils/TestPOJOs.java   | 118 +++++++++++++++++++
 .../extensions/avro/schemas/utils/AvroUtils.java   |  87 ++++++++------
 10 files changed, 751 insertions(+), 36 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index 83209092691..2b7bb1ca0df 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -28,6 +28,9 @@ import java.lang.reflect.Modifier;
 import java.lang.reflect.Parameter;
 import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -38,6 +41,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.UUID;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.NamingStrategy;
 import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver;
@@ -78,6 +82,9 @@ import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueHaver;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -120,6 +127,65 @@ public class ByteBuddyUtils {
   private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class);
   private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE =
       new ForLoadedType(ByteBuddyUtils.class);
+  private static final ForLoadedType LOGICAL_TYPE_TYPE = new 
ForLoadedType(LogicalType.class);
+
+  // Static LogicalType instances used by codegen for JSR-310 and UUID 
POJO/Bean fields. The
+  // generated bytecode loads these via FieldAccess and invokes toBaseType / 
toInputType, so the
+  // fields must be public so generated classes in user packages can access 
them.
+  // See logicalTypeFieldName(...) for the type → field name mapping.
+  public static final LogicalType<LocalDate, Long> 
JAVA_LOCAL_DATE_LOGICAL_TYPE = SqlTypes.DATE;
+  public static final LogicalType<LocalTime, Long> 
JAVA_LOCAL_TIME_LOGICAL_TYPE = SqlTypes.TIME;
+  public static final LogicalType<LocalDateTime, 
org.apache.beam.sdk.values.Row>
+      JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE = SqlTypes.DATETIME;
+  public static final LogicalType<java.time.Instant, 
org.apache.beam.sdk.values.Row>
+      JAVA_INSTANT_LOGICAL_TYPE = new NanosInstant();
+  public static final LogicalType<UUID, org.apache.beam.sdk.values.Row> 
JAVA_UUID_LOGICAL_TYPE =
+      SqlTypes.UUID;
+
+  /**
+   * Returns the {@link Schema.LogicalType} that {@link StaticSchemaInference} 
infers for the given
+   * Java raw type, or {@code null} if no JSR-310 / UUID inference applies.
+   */
+  static @Nullable LogicalType<?, ?> inferredLogicalTypeFor(Class<?> rawType) {
+    if (LocalDate.class.equals(rawType)) {
+      return JAVA_LOCAL_DATE_LOGICAL_TYPE;
+    } else if (LocalTime.class.equals(rawType)) {
+      return JAVA_LOCAL_TIME_LOGICAL_TYPE;
+    } else if (LocalDateTime.class.equals(rawType)) {
+      return JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE;
+    } else if (java.time.Instant.class.equals(rawType)) {
+      return JAVA_INSTANT_LOGICAL_TYPE;
+    } else if (UUID.class.equals(rawType)) {
+      return JAVA_UUID_LOGICAL_TYPE;
+    }
+    return null;
+  }
+
+  /** Maps a Java raw type to the static field name in {@link ByteBuddyUtils} 
that holds it. */
+  private static String logicalTypeFieldName(Class<?> rawType) {
+    if (LocalDate.class.equals(rawType)) {
+      return "JAVA_LOCAL_DATE_LOGICAL_TYPE";
+    } else if (LocalTime.class.equals(rawType)) {
+      return "JAVA_LOCAL_TIME_LOGICAL_TYPE";
+    } else if (LocalDateTime.class.equals(rawType)) {
+      return "JAVA_LOCAL_DATE_TIME_LOGICAL_TYPE";
+    } else if (java.time.Instant.class.equals(rawType)) {
+      return "JAVA_INSTANT_LOGICAL_TYPE";
+    } else if (UUID.class.equals(rawType)) {
+      return "JAVA_UUID_LOGICAL_TYPE";
+    }
+    throw new IllegalArgumentException("Not an inferred logical type: " + 
rawType);
+  }
+
+  /** Stack manipulation that pushes the static {@link LogicalType} for the 
given Java type. */
+  private static StackManipulation loadLogicalType(Class<?> rawType) {
+    return FieldAccess.forField(
+            BYTE_BUDDY_UTILS_TYPE
+                .getDeclaredFields()
+                .filter(ElementMatchers.named(logicalTypeFieldName(rawType)))
+                .getOnly())
+        .read();
+  }
 
   /**
    * A naming strategy for ByteBuddy classes.
@@ -286,6 +352,8 @@ public class ByteBuddyUtils {
         return convertDateTime(typeDescriptor);
       } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) {
         return convertDateTime(typeDescriptor);
+      } else if (inferredLogicalTypeFor(typeDescriptor.getRawType()) != null) {
+        return convertLogicalType(typeDescriptor);
       } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
         return convertByteBuffer(typeDescriptor);
       } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
@@ -324,6 +392,14 @@ public class ByteBuddyUtils {
 
     protected abstract T convertDateTime(TypeDescriptor<?> type);
 
+    /**
+     * Handles JSR-310 ({@link LocalDate}, {@link LocalTime}, {@link 
LocalDateTime}, {@link
+     * java.time.Instant}) and {@link UUID} fields, which {@link 
StaticSchemaInference} infers as
+     * Beam {@link LogicalType}s. Subclasses emit code that round-trips 
through the corresponding
+     * static {@link LogicalType} instance ({@link 
#JAVA_LOCAL_DATE_LOGICAL_TYPE} etc.).
+     */
+    protected abstract T convertLogicalType(TypeDescriptor<?> type);
+
     protected abstract T convertByteBuffer(TypeDescriptor<?> type);
 
     protected abstract T convertCharSequence(TypeDescriptor<?> type);
@@ -401,6 +477,15 @@ public class ByteBuddyUtils {
       return Instant.class;
     }
 
+    @Override
+    protected Type convertLogicalType(TypeDescriptor<?> type) {
+      // The codegen-generated getter returns the LogicalType's base value 
(Long for
+      // Date/Time, Row for DateTime/NanosInstant/UUID). Object.class is a 
safe upper bound
+      // for the FieldValueGetter signature; the framework's 
GetLogicalInputType wrapper
+      // converts back to the input type before exposing the value to user 
code.
+      return Object.class;
+    }
+
     @Override
     protected Type convertByteBuffer(TypeDescriptor<?> type) {
       return byte[].class;
@@ -915,6 +1000,26 @@ public class ByteBuddyUtils {
       return new ShortCircuitReturnNull(readValue, stackManipulation);
     }
 
+    @Override
+    protected StackManipulation convertLogicalType(TypeDescriptor<?> type) {
+      // Equivalent code: return STATIC_LOGICAL_TYPE.toBaseType(value);
+      // where STATIC_LOGICAL_TYPE is one of the JAVA_*_LOGICAL_TYPE static 
fields on
+      // ByteBuddyUtils. The base type is Long (for LocalDate, LocalTime) or 
Row (for the
+      // others); both are reference types so no boxing/casting is needed 
beyond the invoke.
+      StackManipulation stackManipulation =
+          new Compound(
+              loadLogicalType(type.getRawType()),
+              readValue,
+              MethodInvocation.invoke(
+                  LOGICAL_TYPE_TYPE
+                      .getDeclaredMethods()
+                      .filter(
+                          ElementMatchers.named("toBaseType")
+                              .and(ElementMatchers.takesArguments(1)))
+                      .getOnly()));
+      return new ShortCircuitReturnNull(readValue, stackManipulation);
+    }
+
     @Override
     protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
       // Generate the following code:
@@ -1361,6 +1466,29 @@ public class ByteBuddyUtils {
       return new ShortCircuitReturnNull(readValue, stackManipulation);
     }
 
+    @Override
+    protected StackManipulation convertLogicalType(TypeDescriptor<?> type) {
+      // Equivalent code: return (JavaType) 
STATIC_LOGICAL_TYPE.toInputType(value);
+      // FromRowUsingCreator already converted the row's input-type value 
(e.g. LocalDate)
+      // to the LogicalType's base value (e.g. Long) before invoking the 
generated creator,
+      // so we receive the base type here and need to project back to the POJO 
field's
+      // Java type.
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      StackManipulation stackManipulation =
+          new Compound(
+              loadLogicalType(type.getRawType()),
+              readValue,
+              MethodInvocation.invoke(
+                  LOGICAL_TYPE_TYPE
+                      .getDeclaredMethods()
+                      .filter(
+                          ElementMatchers.named("toInputType")
+                              .and(ElementMatchers.takesArguments(1)))
+                      .getOnly()),
+              TypeCasting.to(loadedType));
+      return new ShortCircuitReturnNull(readValue, stackManipulation);
+    }
+
     @Override
     protected StackManipulation convertDefault(TypeDescriptor<?> type) {
       return readValue;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index 196ee6f8659..ffd07071679 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -22,17 +22,23 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import java.lang.reflect.ParameterizedType;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.ReadableInstant;
@@ -127,7 +133,8 @@ public class StaticSchemaInference {
     return fieldFromType(type, fieldValueTypeSupplier, new HashMap<>());
   }
 
-  // TODO(https://github.com/apache/beam/issues/21567): support type inference 
for logical types
+  // TODO(https://github.com/apache/beam/issues/21567): support inference for 
additional/custom
+  // logical types
   private static Schema.FieldType fieldFromType(
       TypeDescriptor type,
       FieldValueTypeSupplier fieldValueTypeSupplier,
@@ -177,6 +184,16 @@ public class StaticSchemaInference {
       return FieldType.STRING;
     } else if (type.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
       return FieldType.DATETIME;
+    } else if (type.getRawType().equals(LocalDate.class)) {
+      return FieldType.logicalType(SqlTypes.DATE);
+    } else if (type.getRawType().equals(LocalTime.class)) {
+      return FieldType.logicalType(SqlTypes.TIME);
+    } else if (type.getRawType().equals(LocalDateTime.class)) {
+      return FieldType.logicalType(SqlTypes.DATETIME);
+    } else if (type.getRawType().equals(java.time.Instant.class)) {
+      return FieldType.logicalType(new NanosInstant());
+    } else if (type.getRawType().equals(UUID.class)) {
+      return FieldType.logicalType(SqlTypes.UUID);
     } else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
       return FieldType.BYTES;
     } else if (type.isSubtypeOf(TypeDescriptor.of(Iterable.class))) {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
index df6c9cf18e5..de0953a0a08 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ARRAY_OF_BYTE_ARRA
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.CASE_FORMAT_BEAM_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.FIELD_WITH_DESCRIPTION_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ITERABLE_BEAM_SCHEMA;
+import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAYS_BEAM_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA;
@@ -46,9 +47,15 @@ import java.lang.reflect.Executable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.AllNullableBean;
@@ -57,6 +64,7 @@ import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithCaseFormat;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption;
 import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean;
@@ -179,6 +187,81 @@ public class JavaBeanSchemaTest {
     assertEquals("stringbuilder", bean.getStringBuilder().toString());
   }
 
+  @Test
+  public void testJavaTimeSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(JavaTimeBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema);
+    Schema icebergStyleSchema =
+        Schema.builder()
+            .addLogicalTypeField("localDate", SqlTypes.DATE)
+            .addLogicalTypeField("localTime", SqlTypes.TIME)
+            .addLogicalTypeField("localDateTime", SqlTypes.DATETIME)
+            .addLogicalTypeField("instant", new NanosInstant())
+            .addLogicalTypeField("uuid", SqlTypes.UUID)
+            .build();
+    assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema));
+  }
+
+  @Test
+  public void testJavaTimeToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    JavaTimeBean bean = new JavaTimeBean();
+    bean.setLocalDate(LocalDate.of(2024, 1, 15));
+    bean.setLocalTime(LocalTime.of(10, 30, 45));
+    bean.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45));
+    bean.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 
123_456_789L));
+    bean.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555"));
+
+    Row row = registry.getToRowFunction(JavaTimeBean.class).apply(bean);
+
+    assertEquals(5, row.getFieldCount());
+    assertEquals(bean.getLocalDate(), row.getLogicalTypeValue("localDate", 
LocalDate.class));
+    assertEquals(bean.getLocalTime(), row.getLogicalTypeValue("localTime", 
LocalTime.class));
+    assertEquals(
+        bean.getLocalDateTime(), row.getLogicalTypeValue("localDateTime", 
LocalDateTime.class));
+    assertEquals(bean.getInstant(), row.getLogicalTypeValue("instant", 
java.time.Instant.class));
+    assertEquals(bean.getUuid(), row.getLogicalTypeValue("uuid", UUID.class));
+  }
+
+  @Test
+  public void testJavaTimeFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    LocalDate localDate = LocalDate.of(2024, 1, 15);
+    LocalTime localTime = LocalTime.of(10, 30, 45);
+    LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45);
+    java.time.Instant instant = 
java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L);
+    UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555");
+    Row row =
+        Row.withSchema(JAVA_TIME_BEAN_SCHEMA)
+            .addValues(localDate, localTime, localDateTime, instant, uuid)
+            .build();
+
+    JavaTimeBean bean = 
registry.getFromRowFunction(JavaTimeBean.class).apply(row);
+
+    assertEquals(localDate, bean.getLocalDate());
+    assertEquals(localTime, bean.getLocalTime());
+    assertEquals(localDateTime, bean.getLocalDateTime());
+    assertEquals(instant, bean.getInstant());
+    assertEquals(uuid, bean.getUuid());
+  }
+
+  @Test
+  public void testJavaTimeRoundTrip() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    JavaTimeBean original = new JavaTimeBean();
+    original.setLocalDate(LocalDate.of(2024, 1, 15));
+    original.setLocalTime(LocalTime.of(10, 30, 45));
+    original.setLocalDateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 45));
+    original.setInstant(java.time.Instant.ofEpochSecond(1_705_315_845L, 
123_456_789L));
+    original.setUuid(UUID.fromString("11111111-2222-3333-4444-555555555555"));
+
+    Row row = registry.getToRowFunction(JavaTimeBean.class).apply(original);
+    JavaTimeBean roundTripped = 
registry.getFromRowFunction(JavaTimeBean.class).apply(row);
+
+    assertEquals(original, roundTripped);
+  }
+
   @Test
   public void testNullableToRow() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
index 7a66decde01..c80b758adc3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -21,12 +21,14 @@ import static 
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ENUMERATION;
+import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAYS_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_NULLABLE_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_POJO_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLES_SCHEMA;
+import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_JAVA_TIME_POJO_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NULLABLE_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ENUM_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_ITERABLE;
@@ -46,20 +48,28 @@ import static org.junit.Assert.assertTrue;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.FirstCircularNestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullableJavaTimePOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NullablePOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNestedNullable;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNullables;
@@ -228,6 +238,118 @@ public class JavaFieldSchemaTest {
     assertEquals("stringbuilder", pojo.stringBuilder.toString());
   }
 
+  @Test
+  public void testJavaTimeSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(JavaTimePOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_POJO_SCHEMA, schema);
+    // Reproduces the failure mode in #37524: a POJO inferred from JSR-310 
fields must be
+    // assignable to a row schema (e.g. one produced by IcebergIO) that uses 
the same logical
+    // types.
+    Schema icebergStyleSchema =
+        Schema.builder()
+            .addLogicalTypeField("localDate", SqlTypes.DATE)
+            .addLogicalTypeField("localTime", SqlTypes.TIME)
+            .addLogicalTypeField("localDateTime", SqlTypes.DATETIME)
+            .addLogicalTypeField("instant", new NanosInstant())
+            .addLogicalTypeField("uuid", SqlTypes.UUID)
+            .build();
+    assertTrue(schema.assignableToIgnoreNullable(icebergStyleSchema));
+  }
+
+  @Test
+  public void testJavaTimeToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    LocalDate localDate = LocalDate.of(2024, 1, 15);
+    LocalTime localTime = LocalTime.of(10, 30, 45);
+    LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45);
+    java.time.Instant instant = 
java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L);
+    UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555");
+    JavaTimePOJO pojo = new JavaTimePOJO(localDate, localTime, localDateTime, 
instant, uuid);
+
+    Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(pojo);
+
+    assertEquals(5, row.getFieldCount());
+    assertEquals(localDate, row.getLogicalTypeValue("localDate", 
LocalDate.class));
+    assertEquals(localTime, row.getLogicalTypeValue("localTime", 
LocalTime.class));
+    assertEquals(localDateTime, row.getLogicalTypeValue("localDateTime", 
LocalDateTime.class));
+    assertEquals(instant, row.getLogicalTypeValue("instant", 
java.time.Instant.class));
+    assertEquals(uuid, row.getLogicalTypeValue("uuid", UUID.class));
+  }
+
+  @Test
+  public void testJavaTimeFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    LocalDate localDate = LocalDate.of(2024, 1, 15);
+    LocalTime localTime = LocalTime.of(10, 30, 45);
+    LocalDateTime localDateTime = LocalDateTime.of(2024, 1, 15, 10, 30, 45);
+    java.time.Instant instant = 
java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L);
+    UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555");
+    Row row =
+        Row.withSchema(JAVA_TIME_POJO_SCHEMA)
+            .addValues(localDate, localTime, localDateTime, instant, uuid)
+            .build();
+
+    JavaTimePOJO pojo = 
registry.getFromRowFunction(JavaTimePOJO.class).apply(row);
+
+    assertEquals(localDate, pojo.localDate);
+    assertEquals(localTime, pojo.localTime);
+    assertEquals(localDateTime, pojo.localDateTime);
+    assertEquals(instant, pojo.instant);
+    assertEquals(uuid, pojo.uuid);
+  }
+
+  @Test
+  public void testJavaTimeRoundTrip() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    JavaTimePOJO original =
+        new JavaTimePOJO(
+            LocalDate.of(2024, 1, 15),
+            LocalTime.of(10, 30, 45),
+            LocalDateTime.of(2024, 1, 15, 10, 30, 45),
+            java.time.Instant.ofEpochSecond(1_705_315_845L, 123_456_789L),
+            UUID.fromString("11111111-2222-3333-4444-555555555555"));
+
+    Row row = registry.getToRowFunction(JavaTimePOJO.class).apply(original);
+    JavaTimePOJO roundTripped = 
registry.getFromRowFunction(JavaTimePOJO.class).apply(row);
+
+    assertEquals(original, roundTripped);
+  }
+
+  @Test
+  public void testNullableJavaTimeSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(NullableJavaTimePOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(NULLABLE_JAVA_TIME_POJO_SCHEMA, 
schema);
+  }
+
+  @Test
+  public void testNullableJavaTimeToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    NullableJavaTimePOJO pojo = new NullableJavaTimePOJO();
+    Row row = 
registry.getToRowFunction(NullableJavaTimePOJO.class).apply(pojo);
+
+    assertEquals(5, row.getFieldCount());
+    assertNull(row.getLogicalTypeValue("localDate", LocalDate.class));
+    assertNull(row.getLogicalTypeValue("localTime", LocalTime.class));
+    assertNull(row.getLogicalTypeValue("localDateTime", LocalDateTime.class));
+    assertNull(row.getLogicalTypeValue("instant", java.time.Instant.class));
+    assertNull(row.getLogicalTypeValue("uuid", UUID.class));
+  }
+
+  @Test
+  public void testNullableJavaTimeFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row = Row.nullRow(NULLABLE_JAVA_TIME_POJO_SCHEMA);
+
+    NullableJavaTimePOJO pojo = 
registry.getFromRowFunction(NullableJavaTimePOJO.class).apply(row);
+    assertNull(pojo.localDate);
+    assertNull(pojo.localTime);
+    assertNull(pojo.localDateTime);
+    assertNull(pojo.instant);
+    assertNull(pojo.uuid);
+  }
+
   @Test
   public void testNullableSchema() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
index 93d6984d47e..c206bd8f61f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
@@ -24,6 +25,7 @@ import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -211,6 +213,103 @@ public class ConvertTest {
     pipeline.run();
   }
 
+  /** Reproducer for #37524: a Row with a logical-type DateTime field should 
convert to a POJO. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class TimePOJO {
+    public LocalDateTime time;
+
+    public TimePOJO() {}
+
+    public TimePOJO(LocalDateTime time) {
+      this.time = time;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof TimePOJO)) {
+        return false;
+      }
+      TimePOJO that = (TimePOJO) o;
+      return Objects.equals(time, that.time);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(time);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFromRowsWithLogicalTypeDateTime() {
+    Schema icebergStyleSchema =
+        Schema.builder().addLogicalTypeField("time", 
SqlTypes.DATETIME).build();
+    LocalDateTime expected = LocalDateTime.of(2024, 1, 15, 10, 30, 0);
+    Row inputRow = 
Row.withSchema(icebergStyleSchema).addValue(expected).build();
+
+    PCollection<TimePOJO> pojos =
+        pipeline
+            .apply(Create.of(inputRow).withRowSchema(icebergStyleSchema))
+            .apply(Convert.fromRows(TimePOJO.class));
+
+    PAssert.that(pojos).containsInAnyOrder(new TimePOJO(expected));
+    pipeline.run();
+  }
+
+  /** POJO mixing a logical-type field with a primitive field. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class MixedTimePOJO {
+    public LocalDateTime time;
+    public String name;
+
+    public MixedTimePOJO() {}
+
+    public MixedTimePOJO(LocalDateTime time, String name) {
+      this.time = time;
+      this.name = name;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof MixedTimePOJO)) {
+        return false;
+      }
+      MixedTimePOJO that = (MixedTimePOJO) o;
+      return Objects.equals(time, that.time) && Objects.equals(name, 
that.name);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(time, name);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFromRowsWithMixedLogicalAndPrimitiveTypes() {
+    Schema mixedSchema =
+        Schema.builder()
+            .addLogicalTypeField("time", SqlTypes.DATETIME)
+            .addStringField("name")
+            .build();
+    LocalDateTime expectedTime = LocalDateTime.of(2024, 1, 15, 10, 30, 0);
+    Row inputRow = Row.withSchema(mixedSchema).addValues(expectedTime, 
"hello").build();
+
+    PCollection<MixedTimePOJO> pojos =
+        pipeline
+            .apply(Create.of(inputRow).withRowSchema(mixedSchema))
+            .apply(Convert.fromRows(MixedTimePOJO.class));
+
+    PAssert.that(pojos).containsInAnyOrder(new MixedTimePOJO(expectedTime, 
"hello"));
+    pipeline.run();
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testGeneralConvert() {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index 7e9cf9a894b..57d4f905e6e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.schemas.utils;
 
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BOXED_FIELDS_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BYTE_ARRAY_SCHEMA;
+import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.JAVA_TIME_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_COLLECTION_BEAN_SCHEMA;
@@ -44,6 +45,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import 
org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean;
@@ -79,6 +81,14 @@ public class JavaBeanUtilsTest {
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
+  @Test
+  public void testJavaTimeBean() {
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            new TypeDescriptor<JavaTimeBean>() {}, 
GetterTypeSupplier.INSTANCE);
+    SchemaTestUtils.assertSchemaEquivalent(JAVA_TIME_BEAN_SCHEMA, schema);
+  }
+
   @Test
   public void testNestedBean() {
     Schema schema =
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index 6b9fbcd30a2..72407d965da 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.JAVA_TIME_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_COLLECTION_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA;
@@ -38,7 +39,11 @@ import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
 import 
org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
@@ -82,6 +87,33 @@ public class POJOUtilsTest {
     assertEquals(SIMPLE_POJO_SCHEMA, schema);
   }
 
+  @Test
+  public void testJavaTimePOJO() {
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(
+            new TypeDescriptor<JavaTimePOJO>() {}, 
JavaFieldTypeSupplier.INSTANCE);
+    assertEquals(JAVA_TIME_POJO_SCHEMA, schema);
+  }
+
+  /**
+   * Regression test for #37524: Joda {@link Instant} must continue to map to 
{@link
+   * FieldType#DATETIME}, not the new {@code java.time.Instant} logical type. 
This guards the branch
+   * ordering in {@code StaticSchemaInference.fieldFromType}.
+   */
+  @Test
+  public void testJodaInstantStillInfersAsDatetime() {
+    FieldType jodaInferred =
+        StaticSchemaInference.fieldFromType(
+            TypeDescriptor.of(Instant.class), JavaFieldTypeSupplier.INSTANCE);
+    assertEquals(FieldType.DATETIME, jodaInferred);
+
+    FieldType javaTimeInferred =
+        StaticSchemaInference.fieldFromType(
+            TypeDescriptor.of(java.time.Instant.class), 
JavaFieldTypeSupplier.INSTANCE);
+    assertEquals(TypeName.LOGICAL_TYPE, javaTimeInferred.getTypeName());
+    assertEquals(NanosInstant.IDENTIFIER, 
javaTimeInferred.getLogicalType().getIdentifier());
+  }
+
   @Test
   public void testNestedPOJO() {
     Schema schema =
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
index 694db05b091..d8ed86f2b55 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -19,10 +19,14 @@ package org.apache.beam.sdk.schemas.utils;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -33,6 +37,8 @@ import 
org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
 import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -1398,4 +1404,87 @@ public class TestJavaBeans {
               Schema.Field.nullable("value", FieldType.FLOAT)
                   .withDescription("This value is the value stored in the 
object as a float."))
           .build();
+
+  /** A Bean containing JSR-310 date/time types and a UUID, all inferred as 
Beam logical types. */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class JavaTimeBean {
+    private LocalDate localDate;
+    private LocalTime localTime;
+    private LocalDateTime localDateTime;
+    private java.time.Instant instant;
+    private UUID uuid;
+
+    public JavaTimeBean() {}
+
+    public LocalDate getLocalDate() {
+      return localDate;
+    }
+
+    public void setLocalDate(LocalDate localDate) {
+      this.localDate = localDate;
+    }
+
+    public LocalTime getLocalTime() {
+      return localTime;
+    }
+
+    public void setLocalTime(LocalTime localTime) {
+      this.localTime = localTime;
+    }
+
+    public LocalDateTime getLocalDateTime() {
+      return localDateTime;
+    }
+
+    public void setLocalDateTime(LocalDateTime localDateTime) {
+      this.localDateTime = localDateTime;
+    }
+
+    public java.time.Instant getInstant() {
+      return instant;
+    }
+
+    public void setInstant(java.time.Instant instant) {
+      this.instant = instant;
+    }
+
+    public UUID getUuid() {
+      return uuid;
+    }
+
+    public void setUuid(UUID uuid) {
+      this.uuid = uuid;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof JavaTimeBean)) {
+        return false;
+      }
+      JavaTimeBean that = (JavaTimeBean) o;
+      return Objects.equals(localDate, that.localDate)
+          && Objects.equals(localTime, that.localTime)
+          && Objects.equals(localDateTime, that.localDateTime)
+          && Objects.equals(instant, that.instant)
+          && Objects.equals(uuid, that.uuid);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(localDate, localTime, localDateTime, instant, uuid);
+    }
+  }
+
+  /** The schema for {@link JavaTimeBean}. */
+  public static final Schema JAVA_TIME_BEAN_SCHEMA =
+      Schema.builder()
+          .addLogicalTypeField("localDate", SqlTypes.DATE)
+          .addLogicalTypeField("localTime", SqlTypes.TIME)
+          .addLogicalTypeField("localDateTime", SqlTypes.DATETIME)
+          .addLogicalTypeField("instant", new NanosInstant())
+          .addLogicalTypeField("uuid", SqlTypes.UUID)
+          .build();
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
index b82c4dc0e7e..38ec507480f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -19,10 +19,14 @@ package org.apache.beam.sdk.schemas.utils;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -34,6 +38,8 @@ import 
org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
 import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -1281,4 +1287,116 @@ public class TestPOJOs {
               Schema.Field.nullable("str", FieldType.STRING)
                   .withDescription("a simple string that is part of this 
field"))
           .build();
+
+  /** A POJO containing JSR-310 date/time types and a UUID, all inferred as 
Beam logical types. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class JavaTimePOJO {
+    public LocalDate localDate;
+    public LocalTime localTime;
+    public LocalDateTime localDateTime;
+    public java.time.Instant instant;
+    public UUID uuid;
+
+    public JavaTimePOJO() {}
+
+    public JavaTimePOJO(
+        LocalDate localDate,
+        LocalTime localTime,
+        LocalDateTime localDateTime,
+        java.time.Instant instant,
+        UUID uuid) {
+      this.localDate = localDate;
+      this.localTime = localTime;
+      this.localDateTime = localDateTime;
+      this.instant = instant;
+      this.uuid = uuid;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof JavaTimePOJO)) {
+        return false;
+      }
+      JavaTimePOJO that = (JavaTimePOJO) o;
+      return Objects.equals(localDate, that.localDate)
+          && Objects.equals(localTime, that.localTime)
+          && Objects.equals(localDateTime, that.localDateTime)
+          && Objects.equals(instant, that.instant)
+          && Objects.equals(uuid, that.uuid);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(localDate, localTime, localDateTime, instant, uuid);
+    }
+  }
+
+  /** The schema for {@link JavaTimePOJO}. */
+  public static final Schema JAVA_TIME_POJO_SCHEMA =
+      Schema.builder()
+          .addLogicalTypeField("localDate", SqlTypes.DATE)
+          .addLogicalTypeField("localTime", SqlTypes.TIME)
+          .addLogicalTypeField("localDateTime", SqlTypes.DATETIME)
+          .addLogicalTypeField("instant", new NanosInstant())
+          .addLogicalTypeField("uuid", SqlTypes.UUID)
+          .build();
+
+  /** A POJO with nullable JSR-310 and UUID fields. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NullableJavaTimePOJO {
+    public @Nullable LocalDate localDate;
+    public @Nullable LocalTime localTime;
+    public @Nullable LocalDateTime localDateTime;
+    public java.time.@Nullable Instant instant;
+    public @Nullable UUID uuid;
+
+    public NullableJavaTimePOJO() {}
+
+    public NullableJavaTimePOJO(
+        LocalDate localDate,
+        LocalTime localTime,
+        LocalDateTime localDateTime,
+        java.time.Instant instant,
+        UUID uuid) {
+      this.localDate = localDate;
+      this.localTime = localTime;
+      this.localDateTime = localDateTime;
+      this.instant = instant;
+      this.uuid = uuid;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof NullableJavaTimePOJO)) {
+        return false;
+      }
+      NullableJavaTimePOJO that = (NullableJavaTimePOJO) o;
+      return Objects.equals(localDate, that.localDate)
+          && Objects.equals(localTime, that.localTime)
+          && Objects.equals(localDateTime, that.localDateTime)
+          && Objects.equals(instant, that.instant)
+          && Objects.equals(uuid, that.uuid);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(localDate, localTime, localDateTime, instant, uuid);
+    }
+  }
+
+  /** The schema for {@link NullableJavaTimePOJO}. */
+  public static final Schema NULLABLE_JAVA_TIME_POJO_SCHEMA =
+      Schema.builder()
+          .addNullableLogicalTypeField("localDate", SqlTypes.DATE)
+          .addNullableLogicalTypeField("localTime", SqlTypes.TIME)
+          .addNullableLogicalTypeField("localDateTime", SqlTypes.DATETIME)
+          .addNullableLogicalTypeField("instant", new NanosInstant())
+          .addNullableLogicalTypeField("uuid", SqlTypes.UUID)
+          .build();
 }
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 99eb7f96190..854b0d3c8bb 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -335,15 +335,20 @@ public class AvroUtils {
     }
 
     @Override
-    protected java.lang.reflect.Type convertDefault(TypeDescriptor<?> type) {
+    protected java.lang.reflect.Type convertLogicalType(TypeDescriptor<?> 
type) {
       if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class))
           || type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) {
         return convertDateTime(type);
-      } else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
+      }
+      return super.convertLogicalType(type);
+    }
+
+    @Override
+    protected java.lang.reflect.Type convertDefault(TypeDescriptor<?> type) {
+      if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
         return byte[].class;
-      } else {
-        return super.convertDefault(type);
       }
+      return super.convertDefault(type);
     }
   }
 
@@ -358,18 +363,8 @@ public class AvroUtils {
     }
 
     @Override
-    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
-      if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
-        // Generate the following code:
-        // return value.bytes();
-        return new Compound(
-            readValue,
-            MethodInvocation.invoke(
-                new ForLoadedType(GenericFixed.class)
-                    .getDeclaredMethods()
-                    
.filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES)))
-                    .getOnly()));
-      } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
+    protected StackManipulation convertLogicalType(TypeDescriptor<?> type) {
+      if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
         // Generates the following code:
         //   return Instant.ofEpochMilli(value.toEpochMilli())
         StackManipulation onNotNull =
@@ -408,6 +403,22 @@ public class AvroUtils {
                         .getOnly()));
         return shortCircuitReturnNull(readValue, onNotNull);
       }
+      return super.convertLogicalType(type);
+    }
+
+    @Override
+    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
+      if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
+        // Generate the following code:
+        // return value.bytes();
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                new ForLoadedType(GenericFixed.class)
+                    .getDeclaredMethods()
+                    
.filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES)))
+                    .getOnly()));
+      }
       return super.convertDefault(type);
     }
   }
@@ -423,25 +434,8 @@ public class AvroUtils {
     }
 
     @Override
-    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
-      if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
-        // Generate the following code:
-        //   return new T((byte[]) value);
-        ForLoadedType loadedType = new ForLoadedType(type.getRawType());
-        return new Compound(
-            TypeCreation.of(loadedType),
-            Duplication.SINGLE,
-            // Load the parameter and cast it to a byte[].
-            readValue,
-            TypeCasting.to(BYTES),
-            // Create a new instance that wraps this byte[].
-            MethodInvocation.invoke(
-                loadedType
-                    .getDeclaredMethods()
-                    .filter(
-                        
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES)))
-                    .getOnly()));
-      } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
+    protected StackManipulation convertLogicalType(TypeDescriptor<?> type) {
+      if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
         // Generates the following code:
         //   return java.time.Instant.ofEpochMilli(value.getMillis())
         StackManipulation onNotNull =
@@ -479,6 +473,29 @@ public class AvroUtils {
                         .getOnly()));
         return shortCircuitReturnNull(readValue, onNotNull);
       }
+      return super.convertLogicalType(type);
+    }
+
+    @Override
+    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
+      if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
+        // Generate the following code:
+        //   return new T((byte[]) value);
+        ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+        return new Compound(
+            TypeCreation.of(loadedType),
+            Duplication.SINGLE,
+            // Load the parameter and cast it to a byte[].
+            readValue,
+            TypeCasting.to(BYTES),
+            // Create a new instance that wraps this byte[].
+            MethodInvocation.invoke(
+                loadedType
+                    .getDeclaredMethods()
+                    .filter(
+                        
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES)))
+                    .getOnly()));
+      }
       return super.convertDefault(type);
     }
   }

Reply via email to