This is an automated email from the ASF dual-hosted git repository.

stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e543997f1 support generics in from row and to row conversions 
(#37347)
66e543997f1 is described below

commit 66e543997f111d77810bcc6a0417ab612ac43277
Author: Maciej Szwaja <[email protected]>
AuthorDate: Mon Jun 8 12:20:20 2026 +0200

    support generics in from row and to row conversions (#37347)
---
 .../sdk/schemas/FieldValueTypeInformation.java     |  13 +-
 .../beam/sdk/schemas/FromRowUsingCreator.java      |   6 +-
 .../sdk/schemas/GetterBasedSchemaProvider.java     |  98 +++--
 .../beam/sdk/schemas/utils/AutoValueUtils.java     |   9 +-
 .../main/java/org/apache/beam/sdk/values/Row.java  |   6 +-
 .../org/apache/beam/sdk/values/RowWithGetters.java |  13 +-
 .../beam/sdk/schemas/AutoValueSchemaTest.java      | 426 ++++++++++++++++++++-
 .../beam/sdk/schemas/JavaBeanSchemaTest.java       | 167 +++++++-
 .../beam/sdk/schemas/JavaFieldSchemaTest.java      | 196 +++++++++-
 .../beam/sdk/schemas/utils/TestJavaBeans.java      |  35 ++
 .../apache/beam/sdk/schemas/utils/TestPOJOs.java   |  25 ++
 .../beam/sdk/extensions/arrow/ArrowConversion.java |   4 +-
 .../sdk/io/aws2/schemas/AwsSchemaProvider.java     |   7 +-
 .../apache/beam/sdk/io/aws2/schemas/AwsTypes.java  |  16 +
 14 files changed, 952 insertions(+), 69 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index 43aac6a5e20..95030eda098 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -105,7 +105,11 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
 
     public abstract Builder setDescription(@Nullable String fieldDescription);
 
-    abstract FieldValueTypeInformation build();
+    public abstract FieldValueTypeInformation build();
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FieldValueTypeInformation.Builder();
   }
 
   public static FieldValueTypeInformation forOneOf(
@@ -311,7 +315,8 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
     return toBuilder().setName(name).build();
   }
 
-  static @Nullable FieldValueTypeInformation 
getIterableComponentType(TypeDescriptor<?> valueType) {
+  public static @Nullable FieldValueTypeInformation getIterableComponentType(
+      TypeDescriptor<?> valueType) {
     // TODO: Figure out nullable elements.
     TypeDescriptor<?> componentType = 
ReflectUtils.getIterableComponentType(valueType);
     if (componentType == null) {
@@ -331,13 +336,13 @@ public abstract class FieldValueTypeInformation 
implements Serializable {
   }
 
   // If the type is a map type, returns the key type, otherwise returns a null 
reference.
-  private static @Nullable FieldValueTypeInformation getMapKeyType(
+  public static @Nullable FieldValueTypeInformation getMapKeyType(
       TypeDescriptor<?> typeDescriptor) {
     return getMapType(typeDescriptor, 0);
   }
 
   // If the type is a map type, returns the value type, otherwise returns a 
null reference.
-  private static @Nullable FieldValueTypeInformation getMapValueType(
+  public static @Nullable FieldValueTypeInformation getMapValueType(
       TypeDescriptor<?> typeDescriptor) {
     return getMapType(typeDescriptor, 1);
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 464dc00cec7..0f5c47f7259 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -82,10 +82,10 @@ class FromRowUsingCreator<T> implements 
SerializableFunction<Row, T>, Function<R
       return null;
     }
     if (row instanceof RowWithGetters) {
-      Object target = ((RowWithGetters) row).getGetterTarget();
-      if (target.getClass().equals(typeDescriptor.getRawType())) {
+      RowWithGetters rowWithGetters = (RowWithGetters) row;
+      if (rowWithGetters.getGetterTargetType().equals(typeDescriptor)) {
         // Efficient path: simply extract the underlying object instead of 
creating a new one.
-        return (T) target;
+        return (T) rowWithGetters.getGetterTarget();
       }
     }
     if (fieldConverters == null) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index e08f193d407..7fb9e5a5dee 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -20,16 +20,20 @@ package org.apache.beam.sdk.schemas;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.LogicalType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -117,9 +121,11 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
       implements SerializableFunction<T, Row> {
     private final Schema schema;
     private final Factory<List<FieldValueGetter<T, Object>>> getterFactory;
+    private final TypeDescriptor getterTargetType;
 
-    public ToRowWithValueGetters(Schema schema) {
+    public ToRowWithValueGetters(Schema schema, TypeDescriptor 
getterTargetType) {
       this.schema = schema;
+      this.getterTargetType = getterTargetType;
       // Since we know that this factory is always called from inside the 
lambda with the same
       // schema, return a caching factory that caches the first value seen for 
each class. This
       // prevents having to lookup the getter list each time createGetters is 
called.
@@ -128,13 +134,13 @@ public abstract class GetterBasedSchemaProvider 
implements SchemaProvider {
               (Factory<List<FieldValueGetter<T, Object>>>)
                   (typeDescriptor, schema1) ->
                       (List)
-                          GetterBasedSchemaProvider.this.fieldValueGetters(
-                              typeDescriptor, schema1));
+                          
GetterBasedSchemaProvider.this.fieldValueGetters(typeDescriptor, schema1),
+              GetterBasedSchemaProvider.this::fieldValueTypeInformations);
     }
 
     @Override
     public Row apply(T input) {
-      return Row.withSchema(schema).withFieldValueGetters(getterFactory, 
input);
+      return Row.withSchema(schema).withFieldValueGetters(getterFactory, 
input, getterTargetType);
     }
 
     private GetterBasedSchemaProvider getOuter() {
@@ -172,7 +178,7 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
         Verify.verifyNotNull(
             schemaFor(typeDescriptor), "can't create a ToRowFunction with null 
schema");
 
-    return new ToRowWithValueGetters<>(schema);
+    return new ToRowWithValueGetters<>(schema, typeDescriptor);
   }
 
   @Override
@@ -194,16 +200,21 @@ public abstract class GetterBasedSchemaProvider 
implements SchemaProvider {
   private static class RowValueGettersFactory<T extends @NonNull Object>
       implements Factory<List<FieldValueGetter<T, Object>>> {
     private final Factory<List<FieldValueGetter<T, Object>>> gettersFactory;
+    private final Factory<List<FieldValueTypeInformation>> typeInfoFactory;
     private final @NotOnlyInitialized Factory<List<FieldValueGetter<T, 
Object>>>
         cachingGettersFactory;
 
     static <T extends @NonNull Object> Factory<List<FieldValueGetter<T, 
Object>>> of(
-        Factory<List<FieldValueGetter<T, Object>>> gettersFactory) {
-      return new 
RowValueGettersFactory<>(gettersFactory).cachingGettersFactory;
+        Factory<List<FieldValueGetter<T, Object>>> gettersFactory,
+        Factory<List<FieldValueTypeInformation>> typeInfoFactory) {
+      return new RowValueGettersFactory(gettersFactory, 
typeInfoFactory).cachingGettersFactory;
     }
 
-    RowValueGettersFactory(Factory<List<FieldValueGetter<T, Object>>> 
gettersFactory) {
+    RowValueGettersFactory(
+        Factory<List<FieldValueGetter<T, Object>>> gettersFactory,
+        Factory<List<FieldValueTypeInformation>> typeInfoFactory) {
       this.gettersFactory = gettersFactory;
+      this.typeInfoFactory = typeInfoFactory;
       this.cachingGettersFactory = new CachingFactory<>(this);
     }
 
@@ -211,9 +222,17 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
     public List<FieldValueGetter<T, Object>> create(
         TypeDescriptor<?> typeDescriptor, Schema schema) {
       List<FieldValueGetter<T, Object>> getters = 
gettersFactory.create(typeDescriptor, schema);
+      Map<String, FieldValueTypeInformation> typeInfoByName =
+          typeInfoFactory.create(typeDescriptor, schema).stream()
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
       List<FieldValueGetter<T, Object>> rowGetters = new 
ArrayList<>(getters.size());
       for (int i = 0; i < getters.size(); i++) {
-        rowGetters.add(rowValueGetter(getters.get(i), 
schema.getField(i).getType()));
+        FieldValueGetter getter = Verify.verifyNotNull(getters.get(i));
+        rowGetters.add(
+            rowValueGetter(
+                getter,
+                schema.getField(i).getType(),
+                
Verify.verifyNotNull(typeInfoByName.get(getter.name())).getType()));
       }
       return rowGetters;
     }
@@ -229,26 +248,49 @@ public abstract class GetterBasedSchemaProvider 
implements SchemaProvider {
                   || 
needsConversion(Verify.verifyNotNull(type.getMapValueType()))));
     }
 
-    FieldValueGetter<T, Object> rowValueGetter(FieldValueGetter base, 
FieldType type) {
+    FieldValueGetter<T, Object> rowValueGetter(
+        FieldValueGetter base, FieldType type, @Nullable TypeDescriptor<?> 
getterReturnType) {
       TypeName typeName = type.getTypeName();
       if (!needsConversion(type)) {
         return base;
       }
       if (typeName.equals(TypeName.ROW)) {
-        return new GetRow(base, Verify.verifyNotNull(type.getRowSchema()), 
cachingGettersFactory);
-      } else if (typeName.equals(TypeName.ARRAY)) {
+        return new GetRow(
+            base,
+            getterReturnType,
+            Verify.verifyNotNull(type.getRowSchema()),
+            cachingGettersFactory);
+      } else if (typeName.equals(TypeName.ARRAY) || 
typeName.equals(TypeName.ITERABLE)) {
         FieldType elementType = 
Verify.verifyNotNull(type.getCollectionElementType());
-        return elementType.getTypeName().equals(TypeName.ROW)
-            ? new GetEagerCollection(base, converter(elementType))
-            : new GetCollection(base, converter(elementType));
-      } else if (typeName.equals(TypeName.ITERABLE)) {
-        return new GetIterable(
-            base, 
converter(Verify.verifyNotNull(type.getCollectionElementType())));
+        TypeDescriptor<?> elementTypeDescriptor =
+            Optional.ofNullable(getterReturnType)
+                .map(ReflectUtils::getIterableComponentType)
+                .orElse(null);
+        if (TypeName.ARRAY == typeName) {
+          return TypeName.ROW == elementType.getTypeName()
+              ? new GetEagerCollection(base, converter(elementType, 
elementTypeDescriptor))
+              : new GetCollection(base, converter(elementType, 
elementTypeDescriptor));
+        } else { // TypeName.ITERABLE
+          return new GetIterable(base, converter(elementType, 
elementTypeDescriptor));
+        }
       } else if (typeName.equals(TypeName.MAP)) {
+        @Nullable
+        TypeDescriptor[] resolvedKeyValueTypes =
+            Optional.ofNullable(getterReturnType)
+                .<@Nullable TypeDescriptor[]>map(
+                    getterType ->
+                        Arrays.stream(Map.class.getTypeParameters())
+                            .<@Nullable TypeDescriptor>map(
+                                typeVar -> {
+                                  TypeDescriptor resolved = 
getterType.resolveType(typeVar);
+                                  return resolved.hasUnresolvedParameters() ? 
null : resolved;
+                                })
+                            .<@Nullable 
TypeDescriptor>toArray(TypeDescriptor[]::new))
+                .orElse(new TypeDescriptor[] {null, null});
         return new GetMap(
             base,
-            converter(Verify.verifyNotNull(type.getMapKeyType())),
-            converter(Verify.verifyNotNull(type.getMapValueType())));
+            converter(Verify.verifyNotNull(type.getMapKeyType()), 
resolvedKeyValueTypes[0]),
+            converter(Verify.verifyNotNull(type.getMapValueType()), 
resolvedKeyValueTypes[1]));
       } else if (type.isLogicalType(OneOfType.IDENTIFIER)) {
         OneOfType oneOfType = type.getLogicalType(OneOfType.class);
         Schema oneOfSchema = oneOfType.getOneOfSchema();
@@ -258,7 +300,7 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
             Maps.newHashMapWithExpectedSize(values.size());
         for (Map.Entry<String, Integer> kv : values.entrySet()) {
           FieldType fieldType = oneOfSchema.getField(kv.getKey()).getType();
-          FieldValueGetter<?, ?> converter = converter(fieldType);
+          FieldValueGetter<?, ?> converter = converter(fieldType, null);
           converters.put(kv.getValue(), converter);
         }
 
@@ -269,27 +311,35 @@ public abstract class GetterBasedSchemaProvider 
implements SchemaProvider {
       return base;
     }
 
-    FieldValueGetter<?, ?> converter(FieldType type) {
-      return rowValueGetter(IDENTITY, type);
+    FieldValueGetter<?, ?> converter(FieldType type, @Nullable 
TypeDescriptor<?> getterReturnType) {
+      return rowValueGetter(IDENTITY, type, getterReturnType);
     }
 
     static class GetRow<T extends @NonNull Object, V extends @NonNull Object>
         extends Converter<T, V> {
       final Schema schema;
       final Factory<List<FieldValueGetter<V, Object>>> factory;
+      final @Nullable TypeDescriptor<?> valueType;
 
       GetRow(
           FieldValueGetter<T, V> getter,
+          @Nullable TypeDescriptor<?> getterReturnType,
           Schema schema,
           Factory<List<FieldValueGetter<V, Object>>> factory) {
         super(getter);
         this.schema = schema;
         this.factory = factory;
+        this.valueType = getterReturnType;
       }
 
       @Override
       Object convert(V value) {
-        return Row.withSchema(schema).withFieldValueGetters(factory, value);
+        return Row.withSchema(schema)
+            .withFieldValueGetters(
+                factory,
+                value,
+                Optional.ofNullable(valueType)
+                    .orElse((TypeDescriptor) 
TypeDescriptor.of(value.getClass())));
       }
     }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
index 78808fdc10c..a9353bcaaac 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
@@ -162,7 +162,7 @@ public class AutoValueUtils {
     Optional<Constructor<?>> constructor =
         
Arrays.stream(generatedTypeDescriptor.getRawType().getDeclaredConstructors())
             .filter(c -> !Modifier.isPrivate(c.getModifiers()))
-            .filter(c -> matchConstructor(c, schemaTypes))
+            .filter(c -> matchConstructor(generatedTypeDescriptor, c, 
schemaTypes))
             .findAny();
     return constructor
         .map(
@@ -177,7 +177,9 @@ public class AutoValueUtils {
   }
 
   private static boolean matchConstructor(
-      Constructor<?> constructor, List<FieldValueTypeInformation> getterTypes) 
{
+      TypeDescriptor typeDescriptor,
+      Constructor<?> constructor,
+      List<FieldValueTypeInformation> getterTypes) {
     if (constructor.getParameters().length != getterTypes.size()) {
       return false;
     }
@@ -197,7 +199,8 @@ public class AutoValueUtils {
     // Verify that constructor parameters match (name and type) the inferred 
schema.
     for (Parameter parameter : constructor.getParameters()) {
       FieldValueTypeInformation type = typeMap.get(parameter.getName());
-      if (type == null || type.getRawType() != parameter.getType()) {
+      if (type == null
+          || 
!type.getType().equals(typeDescriptor.resolveType(parameter.getParameterizedType())))
 {
         valid = false;
         break;
       }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 11d02be46d2..6eb063f84b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -838,9 +838,11 @@ public abstract class Row implements Serializable {
 
     @Internal
     public <T> Row withFieldValueGetters(
-        Factory<List<FieldValueGetter<T, Object>>> fieldValueGetterFactory, T 
getterTarget) {
+        Factory<List<FieldValueGetter<T, Object>>> fieldValueGetterFactory,
+        T getterTarget,
+        TypeDescriptor<?> getterTargetType) {
       checkState(getterTarget != null, "getters require withGetterTarget.");
-      return new RowWithGetters<>(schema, fieldValueGetterFactory, 
getterTarget);
+      return new RowWithGetters<>(schema, fieldValueGetterFactory, 
getterTarget, getterTargetType);
     }
 
     public Row build() {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index 35e0ac20d3f..0cbc7bfb992 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -44,14 +44,19 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @SuppressWarnings("rawtypes")
 public class RowWithGetters<T extends @NonNull Object> extends Row {
   private final T getterTarget;
+  private final TypeDescriptor<?> getterTargetType;
   private final List<FieldValueGetter<T, Object>> getters;
   private @Nullable Map<Integer, @Nullable Object> cache = null;
 
   RowWithGetters(
-      Schema schema, Factory<List<FieldValueGetter<T, Object>>> getterFactory, 
T getterTarget) {
+      Schema schema,
+      Factory<List<FieldValueGetter<T, Object>>> getterFactory,
+      T getterTarget,
+      TypeDescriptor<?> getterTargetType) {
     super(schema);
     this.getterTarget = getterTarget;
-    this.getters = 
getterFactory.create(TypeDescriptor.of(getterTarget.getClass()), schema);
+    this.getterTargetType = getterTargetType;
+    this.getters = getterFactory.create(getterTargetType, schema);
   }
 
   @Override
@@ -90,6 +95,10 @@ public class RowWithGetters<T extends @NonNull Object> 
extends Row {
     return (W) fieldValue;
   }
 
+  public TypeDescriptor<?> getGetterTargetType() {
+    return getterTargetType;
+  }
+
   private boolean cacheFieldType(Field field) {
     TypeName typeName = field.getType().getTypeName();
     return typeName.equals(TypeName.MAP)
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
index f4700212511..99b858a4e30 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static 
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
 import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
@@ -28,6 +29,8 @@ import com.google.auto.value.extension.memoized.Memoized;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -37,9 +40,13 @@ import 
org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -70,7 +77,7 @@ public class AutoValueSchemaTest {
           .build();
   static final Schema OUTER_SCHEMA = Schema.builder().addRowField("inner", 
SIMPLE_SCHEMA).build();
 
-  private Row createSimpleRow(String name) {
+  private static Row createSimpleRow(String name) {
     return Row.withSchema(SIMPLE_SCHEMA)
         .addValues(
             name,
@@ -348,6 +355,50 @@ public class AutoValueSchemaTest {
     }
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class GenericAutoValue<T> {
+    public abstract T getT();
+
+    GenericAutoValue() {}
+
+    public static <T> GenericAutoValue<T> create(T t) {
+      return new AutoValue_AutoValueSchemaTest_GenericAutoValue<>(t);
+    }
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class GenericAutoValueWithBuilder<T> {
+    public abstract T getT();
+
+    GenericAutoValueWithBuilder() {}
+
+    public static <T> Builder<T> builder() {
+      return new 
AutoValue_AutoValueSchemaTest_GenericAutoValueWithBuilder.Builder<>();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      public abstract Builder<T> setT(T t);
+
+      public abstract GenericAutoValueWithBuilder<T> build();
+    }
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class GenericAutoValueWithCreator<T> {
+    public abstract T getT();
+
+    GenericAutoValueWithCreator() {}
+
+    @SchemaCreate
+    public static <T> GenericAutoValueWithCreator<T> create(T t) {
+      return new 
AutoValue_AutoValueSchemaTest_GenericAutoValueWithCreator<>(t);
+    }
+  }
+
   private void verifyRow(Row row) {
     assertEquals("string", row.getString("str"));
     assertEquals((byte) 1, (Object) row.getByte("aByte"));
@@ -385,6 +436,375 @@ public class AutoValueSchemaTest {
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_SCHEMA, schema);
   }
 
+  @Test
+  public void testGenericAutoValueSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema actual = registry.getSchema(new 
TypeDescriptor<GenericAutoValue<SimpleSchema>>() {});
+    Schema expected = Schema.builder().addRowField("t", SIMPLE_SCHEMA).build();
+    assertSchemaEquivalent(expected, actual);
+  }
+
+  @Test
+  public void testNestedGenericAutoValueSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema actual =
+        registry.getSchema(
+            new 
TypeDescriptor<GenericAutoValue<GenericAutoValue<SimpleSchema>>>() {});
+    Schema expected =
+        Schema.builder()
+            .addRowField("t", Schema.builder().addRowField("t", 
SIMPLE_SCHEMA).build())
+            .build();
+
+    assertSchemaEquivalent(expected, actual);
+  }
+
+  @Test
+  public void testGenericAutoValueToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<GenericAutoValue<SimpleSchema>, Row> toRow =
+        registry.getToRowFunction(new 
TypeDescriptor<GenericAutoValue<SimpleSchema>>() {});
+    Row row =
+        toRow.apply(
+            GenericAutoValue.create(
+                new AutoValue_AutoValueSchemaTest_SimpleAutoValue(
+                    "string",
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    true,
+                    DATE,
+                    BYTE_ARRAY,
+                    ByteBuffer.wrap(BYTE_ARRAY),
+                    DATE.toInstant(),
+                    BigDecimal.ONE,
+                    STRING_BUILDER)));
+
+    verifyRow(row.getRow("t"));
+  }
+
+  @Test
+  public void testGenericAutoValueFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<Row, GenericAutoValue<SimpleAutoValue>> fromRow =
+        registry.getFromRowFunction(new 
TypeDescriptor<GenericAutoValue<SimpleAutoValue>>() {});
+
+    Row row =
+        Row.withSchema(Schema.builder().addRowField("t", 
SIMPLE_SCHEMA).build())
+            .withFieldValue("t", createSimpleRow("string"))
+            .build();
+    GenericAutoValue<SimpleAutoValue> actual = fromRow.apply(row);
+    verifyAutoValue(actual.getT());
+  }
+
+  @Test
+  public void testGenericAutoValueWithCreatorFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<Row, GenericAutoValueWithCreator<SimpleAutoValue>> 
fromRow =
+        registry.getFromRowFunction(
+            new TypeDescriptor<GenericAutoValueWithCreator<SimpleAutoValue>>() 
{});
+
+    Row row =
+        Row.withSchema(Schema.builder().addRowField("t", 
SIMPLE_SCHEMA).build())
+            .withFieldValue("t", createSimpleRow("string"))
+            .build();
+    GenericAutoValueWithCreator<SimpleAutoValue> actual = fromRow.apply(row);
+    verifyAutoValue(actual.getT());
+  }
+
+  @Test
+  public void testGenericAutoValueWithBuilderFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<Row, GenericAutoValueWithBuilder<SimpleAutoValue>> 
fromRow =
+        registry.getFromRowFunction(
+            new TypeDescriptor<GenericAutoValueWithBuilder<SimpleAutoValue>>() 
{});
+
+    Row row =
+        Row.withSchema(Schema.builder().addRowField("t", 
SIMPLE_SCHEMA).build())
+            .withFieldValue("t", createSimpleRow("string"))
+            .build();
+    GenericAutoValueWithBuilder<SimpleAutoValue> actual = fromRow.apply(row);
+    verifyAutoValue(actual.getT());
+  }
+
+  @Test
+  public void testGenericAutoValueBuilderOfMapOfCreatorsFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<
+            Row, GenericAutoValueWithBuilder<Map<String, 
GenericAutoValueWithCreator<String>>>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    GenericAutoValueWithBuilder<
+                        Map<String, GenericAutoValueWithCreator<String>>>>() 
{});
+
+    Schema mapValueSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder()
+                    .addMapField("t", FieldType.STRING, 
FieldType.row(mapValueSchema))
+                    .build())
+            .withFieldValue(
+                "t",
+                ImmutableMap.<String, Row>builder()
+                    .put("k1", 
Row.withSchema(mapValueSchema).withFieldValue("t", "v1").build())
+                    .put("k2", 
Row.withSchema(mapValueSchema).withFieldValue("t", "v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithBuilder<Map<String, 
GenericAutoValueWithCreator<String>>> actual =
+        fromRow.apply(row);
+    GenericAutoValueWithCreator<String> genericAutoValue1 =
+        GenericAutoValueWithCreator.create("v1");
+    GenericAutoValueWithCreator<String> genericAutoValue2 =
+        GenericAutoValueWithCreator.create("v2");
+
+    assertEquals(genericAutoValue1, actual.getT().get("k1"));
+    assertEquals(genericAutoValue2, actual.getT().get("k2"));
+  }
+
+  @Test
+  public void testGenericAutoValueCreatorOfMapOfBuildersFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<
+            Row, GenericAutoValueWithCreator<Map<String, 
GenericAutoValueWithBuilder<String>>>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    GenericAutoValueWithCreator<
+                        Map<String, GenericAutoValueWithBuilder<String>>>>() 
{});
+
+    Schema mapValueSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder()
+                    .addMapField("t", FieldType.STRING, 
FieldType.row(mapValueSchema))
+                    .build())
+            .withFieldValue(
+                "t",
+                ImmutableMap.<String, Row>builder()
+                    .put("k1", 
Row.withSchema(mapValueSchema).withFieldValue("t", "v1").build())
+                    .put("k2", 
Row.withSchema(mapValueSchema).withFieldValue("t", "v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithCreator<Map<String, 
GenericAutoValueWithBuilder<String>>> actual =
+        fromRow.apply(row);
+    GenericAutoValueWithBuilder<String> genericAutoValue1 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+    GenericAutoValueWithBuilder<String> genericAutoValue2 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+    assertEquals(genericAutoValue1, actual.getT().get("k1"));
+    assertEquals(genericAutoValue2, actual.getT().get("k2"));
+  }
+
+  @Test
+  public void testGenericAutoValueBuilderOfListOfCreatorsFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<
+            Row, 
GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    
GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>>>() {});
+
+    Schema listElementSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder().addArrayField("t", 
FieldType.row(listElementSchema)).build())
+            .withFieldValue(
+                "t",
+                ImmutableList.<Row>builder()
+                    .add(Row.withSchema(listElementSchema).withFieldValue("t", 
"v1").build())
+                    .add(Row.withSchema(listElementSchema).withFieldValue("t", 
"v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>> 
actual =
+        fromRow.apply(row);
+    GenericAutoValueWithCreator<String> genericAutoValue1 =
+        GenericAutoValueWithCreator.create("v1");
+    GenericAutoValueWithCreator<String> genericAutoValue2 =
+        GenericAutoValueWithCreator.create("v2");
+
+    assertEquals(genericAutoValue1, actual.getT().get(0));
+    assertEquals(genericAutoValue2, actual.getT().get(1));
+  }
+
+  @Test
+  public void testGenericAutoValueCreatorOfListOfBuildersFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<
+            Row, 
GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    
GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>>>() {});
+
+    Schema listElementSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder().addArrayField("t", 
FieldType.row(listElementSchema)).build())
+            .withFieldValue(
+                "t",
+                ImmutableList.<Row>builder()
+                    .add(Row.withSchema(listElementSchema).withFieldValue("t", 
"v1").build())
+                    .add(Row.withSchema(listElementSchema).withFieldValue("t", 
"v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>> 
actual =
+        fromRow.apply(row);
+    GenericAutoValueWithBuilder<String> genericAutoValue1 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+    GenericAutoValueWithBuilder<String> genericAutoValue2 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+    assertEquals(genericAutoValue1, actual.getT().get(0));
+    assertEquals(genericAutoValue2, actual.getT().get(1));
+  }
+
+  @Test
+  public void testGenericAutoValueBuilderOfArrayOfCreatorsFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<Row, 
GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    
GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]>>() {});
+
+    Schema arrayElementSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder().addArrayField("t", 
FieldType.row(arrayElementSchema)).build())
+            .withFieldValue(
+                "t",
+                ImmutableList.<Row>builder()
+                    
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v1").build())
+                    
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]> actual 
= fromRow.apply(row);
+    GenericAutoValueWithCreator<String> genericAutoValue1 =
+        GenericAutoValueWithCreator.create("v1");
+    GenericAutoValueWithCreator<String> genericAutoValue2 =
+        GenericAutoValueWithCreator.create("v2");
+
+    assertEquals(genericAutoValue1, actual.getT()[0]);
+    assertEquals(genericAutoValue2, actual.getT()[1]);
+  }
+
+  @Test
+  public void testGenericAutoValueCreatorOfArrayOfBuildersFromRow() throws 
Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<Row, 
GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]>>
+        fromRow =
+            registry.getFromRowFunction(
+                new TypeDescriptor<
+                    
GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]>>() {});
+
+    Schema arrayElementSchema = Schema.builder().addField("t", 
FieldType.STRING).build();
+
+    Row row =
+        Row.withSchema(
+                Schema.builder().addArrayField("t", 
FieldType.row(arrayElementSchema)).build())
+            .withFieldValue(
+                "t",
+                ImmutableList.<Row>builder()
+                    
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v1").build())
+                    
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v2").build())
+                    .build())
+            .build();
+
+    GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]> actual 
= fromRow.apply(row);
+    GenericAutoValueWithBuilder<String> genericAutoValue1 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+    GenericAutoValueWithBuilder<String> genericAutoValue2 =
+        GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+    assertEquals(genericAutoValue1, actual.getT()[0]);
+    assertEquals(genericAutoValue2, actual.getT()[1]);
+  }
+
+  @Test
+  public void testGenericAutoValueWithMapToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<GenericAutoValue<Map<String, 
GenericAutoValue<String>>>, Row> toRow =
+        registry.getToRowFunction(
+            new TypeDescriptor<GenericAutoValue<Map<String, 
GenericAutoValue<String>>>>() {});
+
+    GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+    GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+    Row row =
+        toRow.apply(
+            GenericAutoValue.create(
+                ImmutableMap.of("k1", genericAutoValue1, "k2", 
genericAutoValue2)));
+
+    assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+    assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+  }
+
+  @Test
+  public void testGenericAutoValueWithListToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<GenericAutoValue<List<GenericAutoValue<String>>>, 
Row> toRow =
+        registry.getToRowFunction(
+            new 
TypeDescriptor<GenericAutoValue<List<GenericAutoValue<String>>>>() {});
+
+    GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+    GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+    Row row =
+        toRow.apply(
+            GenericAutoValue.create(ImmutableList.of(genericAutoValue1, 
genericAutoValue2)));
+    Row[] genericAutoValueRows = row.<Row>getArray("t").toArray(new Row[0]);
+
+    assertEquals("v1", genericAutoValueRows[0].getString("t"));
+    assertEquals("v2", genericAutoValueRows[1].getString("t"));
+  }
+
+  @Test
+  public void testGenericAutoValueWithArrayToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SerializableFunction<GenericAutoValue<GenericAutoValue<String>[]>, Row> 
toRow =
+        registry.getToRowFunction(
+            new TypeDescriptor<GenericAutoValue<GenericAutoValue<String>[]>>() 
{});
+
+    GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+    GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+    @SuppressWarnings("unchecked")
+    Row row =
+        toRow.apply(
+            GenericAutoValue.create(new GenericAutoValue[] {genericAutoValue1, 
genericAutoValue2}));
+    Row[] genericAutoValueRows = row.<Row>getArray("t").toArray(new Row[0]);
+
+    assertEquals("v1", genericAutoValueRows[0].getString("t"));
+    assertEquals("v2", genericAutoValueRows[1].getString("t"));
+  }
+
+  @Test
+  public void testNestedGenericAutoValueToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    
SerializableFunction<GenericAutoValue<GenericAutoValue<GenericAutoValue<String>>>,
 Row> toRow =
+        registry.getToRowFunction(
+            new 
TypeDescriptor<GenericAutoValue<GenericAutoValue<GenericAutoValue<String>>>>() 
{});
+
+    Row row =
+        toRow.apply(
+            
GenericAutoValue.create(GenericAutoValue.create(GenericAutoValue.create("v1"))));
+
+    assertEquals("v1", row.getRow("t").getRow("t").getString("t"));
+  }
+
   @Test
   public void testToRowConstructor() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -402,6 +822,7 @@ public class AutoValueSchemaTest {
             DATE.toInstant(),
             BigDecimal.ONE,
             STRING_BUILDER);
+
     Row row = registry.getToRowFunction(SimpleAutoValue.class).apply(value);
     verifyRow(row);
   }
@@ -444,6 +865,7 @@ public class AutoValueSchemaTest {
             DATE.toInstant(),
             BigDecimal.ONE,
             STRING_BUILDER);
+
     Row row = registry.getToRowFunction(MemoizedAutoValue.class).apply(value);
     verifyRow(row);
   }
@@ -571,6 +993,7 @@ public class AutoValueSchemaTest {
             DATE.toInstant(),
             BigDecimal.ONE,
             STRING_BUILDER);
+
     AutoValueOuter outer = new 
AutoValue_AutoValueSchemaTest_AutoValueOuter(inner);
     Row row = registry.getToRowFunction(AutoValueOuter.class).apply(outer);
     verifyRow(row.getRow("inner"));
@@ -675,6 +1098,7 @@ public class AutoValueSchemaTest {
         Instant instant,
         BigDecimal bigDecimal,
         StringBuilder stringBuilder) {
+
       return new 
AutoValue_AutoValueSchemaTest_SimpleAutoValueWithStaticFactory(
           str,
           aByte,
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
index de0953a0a08..21c6dee1369 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static 
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
 import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ALL_NULLABLE_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ANNOTATED_SIMPLE_BEAN_SCHEMA;
@@ -33,6 +34,7 @@ import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.PARAMETER_NULLABLE
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.PRIMITIVE_ARRAY_BEAN_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.RENAMED_FIELDS_AND_SETTERS_BEAM_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA;
+import static 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.genericBeanSchema;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -63,6 +65,7 @@ import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ArrayOfByteArray;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithCaseFormat;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption;
 import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.GenericBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
@@ -76,9 +79,11 @@ import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
 import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBeanWithAnnotations;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
 import org.joda.time.DateTime;
 import org.junit.Ignore;
@@ -139,6 +144,16 @@ public class JavaBeanSchemaTest {
         .build();
   }
 
+  private <T> GenericBean<T> createGeneric(T t) {
+    GenericBean<T> genericBean = new GenericBean<>();
+    genericBean.setT(t);
+    return genericBean;
+  }
+
+  private Row createGenericRow(Schema.FieldType tFieldType, Object 
tFieldValue) {
+    return Row.withSchema(genericBeanSchema(tFieldType)).withFieldValue("t", 
tFieldValue).build();
+  }
+
   @Test
   public void testSchema() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -146,14 +161,9 @@ public class JavaBeanSchemaTest {
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
-  @Test
-  public void testToRow() throws NoSuchSchemaException {
-    SchemaRegistry registry = SchemaRegistry.createDefault();
-    SimpleBean bean = createSimple("string");
-    Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
-
+  private static void verifyRow(String expectedStrField, Row row) {
     assertEquals(12, row.getFieldCount());
-    assertEquals("string", row.getString("str"));
+    assertEquals(expectedStrField, row.getString("str"));
     assertEquals((byte) 1, (Object) row.getByte("aByte"));
     assertEquals((short) 2, (Object) row.getInt16("aShort"));
     assertEquals((int) 3, (Object) row.getInt32("anInt"));
@@ -167,13 +177,8 @@ public class JavaBeanSchemaTest {
     assertEquals("stringbuilder", row.getString("stringBuilder"));
   }
 
-  @Test
-  public void testFromRow() throws NoSuchSchemaException {
-    SchemaRegistry registry = SchemaRegistry.createDefault();
-    Row row = createSimpleRow("string");
-
-    SimpleBean bean = registry.getFromRowFunction(SimpleBean.class).apply(row);
-    assertEquals("string", bean.getStr());
+  private static void verifySimpleBean(String expectedStrField, SimpleBean 
bean) {
+    assertEquals(expectedStrField, bean.getStr());
     assertEquals((byte) 1, bean.getaByte());
     assertEquals((short) 2, bean.getaShort());
     assertEquals((int) 3, bean.getAnInt());
@@ -262,6 +267,23 @@ public class JavaBeanSchemaTest {
     assertEquals(original, roundTripped);
   }
 
+  @Test
+  public void testToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimpleBean bean = createSimple("string");
+    Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
+    verifyRow("string", row);
+  }
+
+  @Test
+  public void testFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row = createSimpleRow("string");
+
+    SimpleBean bean = registry.getFromRowFunction(SimpleBean.class).apply(row);
+    verifySimpleBean("string", bean);
+  }
+
   @Test
   public void testNullableToRow() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -708,4 +730,121 @@ public class JavaBeanSchemaTest {
     assertEquals(
         registry.getFromRowFunction(BeanWithCaseFormat.class).apply(row), 
beanWithCaseFormat);
   }
+
+  @Test
+  public void testGenericBeamSchema() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema actual = registry.getSchema(new 
TypeDescriptor<GenericBean<SimpleBean>>() {});
+    Schema expected = 
genericBeanSchema(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA));
+
+    assertSchemaEquivalent(expected, actual);
+  }
+
+  @Test
+  public void testGenericBeamSchemaToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    GenericBean<GenericBean<SimpleBean>> genericBean =
+        createGeneric(createGeneric(createSimple("string")));
+
+    Row row =
+        registry
+            .getToRowFunction(new 
TypeDescriptor<GenericBean<GenericBean<SimpleBean>>>() {})
+            .apply(genericBean);
+
+    verifyRow("string", row.getRow("t").getRow("t"));
+  }
+
+  @Test
+  public void testGenericBeamSchemaFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema nestedSchema = 
genericBeanSchema(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA));
+    Row row =
+        createGenericRow(
+            Schema.FieldType.row(nestedSchema),
+            createGenericRow(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA), 
createSimpleRow("string")));
+    GenericBean<GenericBean<SimpleBean>> actual =
+        registry
+            .getFromRowFunction(new 
TypeDescriptor<GenericBean<GenericBean<SimpleBean>>>() {})
+            .apply(row);
+
+    verifySimpleBean("string", actual.getT().getT());
+  }
+
+  @Test
+  public void testGenericBeamSchemaMapToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        registry
+            .getToRowFunction(
+                new TypeDescriptor<GenericBean<Map<String, 
GenericBean<String>>>>() {})
+            .apply(
+                createGeneric(
+                    ImmutableMap.<String, GenericBean<String>>builder()
+                        .put("k1", createGeneric("v1"))
+                        .put("k2", createGeneric("v2"))
+                        .build()));
+
+    assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+    assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+  }
+
+  @Test
+  public void testGenericBeamSchemaMapFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema.FieldType mapValueFieldType =
+        Schema.FieldType.row(genericBeanSchema(Schema.FieldType.STRING));
+    GenericBean<Map<String, GenericBean<String>>> actual =
+        registry
+            .getFromRowFunction(
+                new TypeDescriptor<GenericBean<Map<String, 
GenericBean<String>>>>() {})
+            .apply(
+                createGenericRow(
+                    Schema.FieldType.map(Schema.FieldType.STRING, 
mapValueFieldType),
+                    ImmutableMap.<String, Row>builder()
+                        .put("k1", createGenericRow(Schema.FieldType.STRING, 
"v1"))
+                        .put("k2", createGenericRow(Schema.FieldType.STRING, 
"v2"))
+                        .build()));
+
+    assertEquals("v1", actual.getT().get("k1").getT());
+    assertEquals("v2", actual.getT().get("k2").getT());
+  }
+
+  @Test
+  public void testGenericBeamSchemaIterableToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        registry
+            .getToRowFunction(new 
TypeDescriptor<GenericBean<Iterable<GenericBean<String>>>>() {})
+            .apply(
+                createGeneric(
+                    ImmutableList.<GenericBean<String>>builder()
+                        .add(createGeneric("v1"))
+                        .add(createGeneric("v2"))
+                        .build()));
+
+    Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+    assertEquals("v1", rows[0].getString("t"));
+    assertEquals("v2", rows[1].getString("t"));
+  }
+
+  @Test
+  public void testGenericBeamSchemaIterableFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema.FieldType elementFieldType =
+        Schema.FieldType.row(genericBeanSchema(Schema.FieldType.STRING));
+    GenericBean<Iterable<GenericBean<String>>> actual =
+        registry
+            .getFromRowFunction(new 
TypeDescriptor<GenericBean<Iterable<GenericBean<String>>>>() {})
+            .apply(
+                createGenericRow(
+                    Schema.FieldType.array(elementFieldType),
+                    ImmutableList.<Row>builder()
+                        .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+                        .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+                        .build()));
+    GenericBean<String>[] beans = 
Streams.stream(actual.getT()).toArray(GenericBean[]::new);
+    assertEquals("v1", beans[0].getT());
+    assertEquals("v2", beans[1].getT());
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
index c80b758adc3..d4f94f021c3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static 
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
 import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA;
@@ -36,6 +37,7 @@ import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_NESTED_ARRAY
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.PRIMITIVE_ARRAY_POJO_SCHEMA;
 import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA;
 import static 
org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_WITH_DESCRIPTION_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.genericPOJOSchema;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.FirstCircularNestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.GenericPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
@@ -86,9 +89,11 @@ import 
org.apache.beam.sdk.schemas.utils.TestPOJOs.StaticCreationSimplePojo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
@@ -190,6 +195,10 @@ public class JavaFieldSchemaTest {
         .build();
   }
 
+  private static Row createGenericRow(FieldType tFieldType, Object 
tFieldValue) {
+    return Row.withSchema(genericPOJOSchema(tFieldType)).withFieldValue("t", 
tFieldValue).build();
+  }
+
   @Test
   public void testSchema() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -197,14 +206,9 @@ public class JavaFieldSchemaTest {
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
   }
 
-  @Test
-  public void testToRow() throws NoSuchSchemaException {
-    SchemaRegistry registry = SchemaRegistry.createDefault();
-    SimplePOJO pojo = createSimple("string");
-    Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
-
+  private static void verifySimpleRow(String expectedStrField, Row row) {
     assertEquals(12, row.getFieldCount());
-    assertEquals("string", row.getString("str"));
+    assertEquals(expectedStrField, row.getString("str"));
     assertEquals((byte) 1, (Object) row.getByte("aByte"));
     assertEquals((short) 2, (Object) row.getInt16("aShort"));
     assertEquals((int) 3, (Object) row.getInt32("anInt"));
@@ -218,13 +222,8 @@ public class JavaFieldSchemaTest {
     assertEquals("stringbuilder", row.getString("stringBuilder"));
   }
 
-  @Test
-  public void testFromRow() throws NoSuchSchemaException {
-    SchemaRegistry registry = SchemaRegistry.createDefault();
-    Row row = createSimpleRow("string");
-
-    SimplePOJO pojo = registry.getFromRowFunction(SimplePOJO.class).apply(row);
-    assertEquals("string", pojo.str);
+  private static void verifySimplePOJO(String expectedStrField, SimplePOJO 
pojo) {
+    assertEquals(expectedStrField, pojo.str);
     assertEquals((byte) 1, pojo.aByte);
     assertEquals((short) 2, pojo.aShort);
     assertEquals((int) 3, pojo.anInt);
@@ -350,6 +349,23 @@ public class JavaFieldSchemaTest {
     assertNull(pojo.uuid);
   }
 
+  @Test
+  public void testToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimplePOJO pojo = createSimple("string");
+    Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
+    verifySimpleRow("string", row);
+  }
+
+  @Test
+  public void testFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row = createSimpleRow("string");
+
+    SimplePOJO pojo = registry.getFromRowFunction(SimplePOJO.class).apply(row);
+    verifySimplePOJO("string", pojo);
+  }
+
   @Test
   public void testNullableSchema() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -903,4 +919,156 @@ public class JavaFieldSchemaTest {
         thrown.getMessage(),
         containsString("TestPOJOs$FirstCircularNestedPOJO"));
   }
+
+  @Test
+  public void testGenericPOJOSchema() throws Exception {
+    Schema actual =
+        SchemaRegistry.createDefault()
+            .getSchema(new 
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {});
+    Schema expected =
+        
genericPOJOSchema(FieldType.row(genericPOJOSchema(FieldType.row(SIMPLE_POJO_SCHEMA))));
+    assertSchemaEquivalent(expected, actual);
+  }
+
+  @Test
+  public void testGenericPOJOToRow() throws Exception {
+    Row row =
+        SchemaRegistry.createDefault()
+            .getToRowFunction(new 
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {})
+            
.apply(GenericPOJO.create(GenericPOJO.create(createSimple("string"))));
+
+    verifySimpleRow("string", row.getRow("t").getRow("t"));
+  }
+
+  @Test
+  public void testGenericPOJOFromRow() throws Exception {
+    FieldType innerGenericPOJOFieldType =
+        FieldType.row(genericPOJOSchema(FieldType.row(SIMPLE_POJO_SCHEMA)));
+    GenericPOJO<GenericPOJO<SimplePOJO>> actualPOJO =
+        SchemaRegistry.createDefault()
+            .getFromRowFunction(new 
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {})
+            .apply(
+                createGenericRow(
+                    innerGenericPOJOFieldType,
+                    createGenericRow(
+                        FieldType.row(SIMPLE_POJO_SCHEMA), 
createSimpleRow("string"))));
+
+    verifySimplePOJO("string", actualPOJO.t.t);
+  }
+
+  @Test
+  public void testGenericPOJOSchemaMapToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        registry
+            .getToRowFunction(
+                new TypeDescriptor<GenericPOJO<Map<String, 
GenericPOJO<String>>>>() {})
+            .apply(
+                GenericPOJO.create(
+                    ImmutableMap.<String, GenericPOJO<String>>builder()
+                        .put("k1", GenericPOJO.create("v1"))
+                        .put("k2", GenericPOJO.create("v2"))
+                        .build()));
+
+    assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+    assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+  }
+
+  @Test
+  public void testGenericPOJOSchemaMapFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema.FieldType mapValueFieldType =
+        Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+    GenericPOJO<Map<String, GenericPOJO<String>>> actual =
+        registry
+            .getFromRowFunction(
+                new TypeDescriptor<GenericPOJO<Map<String, 
GenericPOJO<String>>>>() {})
+            .apply(
+                createGenericRow(
+                    Schema.FieldType.map(Schema.FieldType.STRING, 
mapValueFieldType),
+                    ImmutableMap.<String, Row>builder()
+                        .put("k1", createGenericRow(Schema.FieldType.STRING, 
"v1"))
+                        .put("k2", createGenericRow(Schema.FieldType.STRING, 
"v2"))
+                        .build()));
+
+    assertEquals("v1", actual.t.get("k1").t);
+    assertEquals("v2", actual.t.get("k2").t);
+  }
+
+  @Test
+  public void testGenericBeamSchemaIterableToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        registry
+            .getToRowFunction(new 
TypeDescriptor<GenericPOJO<Iterable<GenericPOJO<String>>>>() {})
+            .apply(
+                GenericPOJO.create(
+                    ImmutableList.<GenericPOJO<String>>builder()
+                        .add(GenericPOJO.create("v1"))
+                        .add(GenericPOJO.create("v2"))
+                        .build()));
+
+    Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+    assertEquals("v1", rows[0].getString("t"));
+    assertEquals("v2", rows[1].getString("t"));
+  }
+
+  @Test
+  public void testGenericBeamSchemaIterableFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema.FieldType elementFieldType =
+        Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+    GenericPOJO<Iterable<GenericPOJO<String>>> actual =
+        registry
+            .getFromRowFunction(new 
TypeDescriptor<GenericPOJO<Iterable<GenericPOJO<String>>>>() {})
+            .apply(
+                createGenericRow(
+                    Schema.FieldType.array(elementFieldType),
+                    ImmutableList.<Row>builder()
+                        .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+                        .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+                        .build()));
+    GenericPOJO<String>[] pojos = 
Streams.stream(actual.t).toArray(GenericPOJO[]::new);
+    assertEquals("v1", pojos[0].t);
+    assertEquals("v2", pojos[1].t);
+  }
+
+  @Test
+  public void testGenericBeamSchemaArrayToRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        registry
+            .getToRowFunction(new 
TypeDescriptor<GenericPOJO<GenericPOJO<String>[]>>() {})
+            .apply(
+                GenericPOJO.create(
+                    new GenericPOJO[] {
+                      GenericPOJO.create("v1"), GenericPOJO.create("v2"),
+                    }));
+
+    Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+    assertEquals("v1", rows[0].getString("t"));
+    assertEquals("v2", rows[1].getString("t"));
+  }
+
+  @Test
+  public void testGenericBeamSchemaArrayFromRow() throws Exception {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema.FieldType elementFieldType =
+        Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+    GenericPOJO<GenericPOJO<String>[]> actual =
+        registry
+            .getFromRowFunction(new 
TypeDescriptor<GenericPOJO<GenericPOJO<String>[]>>() {})
+            .apply(
+                createGenericRow(
+                    Schema.FieldType.array(elementFieldType),
+                    ImmutableList.<Row>builder()
+                        .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+                        .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+                        .build()));
+    GenericPOJO<String>[] pojos = actual.t;
+    assertEquals("v1", pojos[0].t);
+    assertEquals("v2", pojos[1].t);
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
index d8ed86f2b55..72db48ccf97 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -1487,4 +1487,39 @@ public class TestJavaBeans {
           .addLogicalTypeField("instant", new NanosInstant())
           .addLogicalTypeField("uuid", SqlTypes.UUID)
           .build();
+
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class GenericBean<T> {
+    @Nullable private T t;
+
+    @Nullable
+    public T getT() {
+      return t;
+    }
+
+    public void setT(@Nullable T t) {
+      this.t = t;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof GenericBean)) {
+        return false;
+      }
+      GenericBean<?> that = (GenericBean<?>) o;
+      return Objects.equals(t, that.t);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(t);
+    }
+  }
+
+  public static Schema genericBeanSchema(FieldType genericFieldType) {
+    return Schema.builder().addNullableField("t", genericFieldType).build();
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
index 38ec507480f..97f1bb20f42 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -1399,4 +1399,29 @@ public class TestPOJOs {
           .addNullableLogicalTypeField("instant", new NanosInstant())
           .addNullableLogicalTypeField("uuid", SqlTypes.UUID)
           .build();
+
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class GenericPOJOWithCreator<T> {
+    public @Nullable T t;
+
+    @SchemaCreate
+    public GenericPOJOWithCreator(@Nullable T t) {
+      this.t = t;
+    }
+  }
+
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class GenericPOJO<T> {
+    public @Nullable T t;
+
+    public static <T> GenericPOJO<T> create(T t) {
+      GenericPOJO<T> genericPOJO = new GenericPOJO<>();
+      genericPOJO.t = t;
+      return genericPOJO;
+    }
+  }
+
+  public static Schema genericPOJOSchema(FieldType tFieldType) {
+    return Schema.builder().addNullableField("t", tFieldType).build();
+  }
 }
diff --git 
a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
 
b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
index 3e22bad6a41..bf7078abc58 100644
--- 
a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
+++ 
b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
@@ -533,7 +533,9 @@ public class ArrowConversion {
         throw new IllegalStateException("There are no more Rows.");
       }
       Row result =
-          Row.withSchema(schema).withFieldValueGetters(this.fieldValueGetters, 
this.currRowIndex);
+          Row.withSchema(schema)
+              .withFieldValueGetters(
+                  this.fieldValueGetters, this.currRowIndex, 
TypeDescriptor.of(Integer.class));
       this.currRowIndex += 1;
       return result;
     }
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
index 21b5c7bcf97..6b3ff6cac86 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.SdkBuilderSetter;
 import org.apache.beam.sdk.io.aws2.schemas.AwsTypes.ConverterFactory;
 import org.apache.beam.sdk.schemas.CachingFactory;
@@ -205,7 +206,11 @@ public class AwsSchemaProvider extends 
GetterBasedSchemaProviderV2 {
   @Override
   public List<FieldValueTypeInformation> fieldValueTypeInformations(
       TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
-    throw new UnsupportedOperationException("FieldValueTypeInformation not 
available");
+    List<SdkField<?>> sdkFieldList = sdkFields((Class) 
targetTypeDescriptor.getRawType());
+
+    return sdkFieldList.stream()
+        .map(AwsTypes::fieldValueTypeInformationFor)
+        .collect(Collectors.toList());
   }
 
   @Override
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
index a0fc0c8e91c..f5b06d3cd1c 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
@@ -27,12 +27,14 @@ import static 
software.amazon.awssdk.core.protocol.MarshallingType.SDK_BYTES;
 import static software.amazon.awssdk.core.protocol.MarshallingType.SDK_POJO;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -91,6 +93,20 @@ public class AwsTypes {
         String.format("Type %s of field %s is unknown.", type, 
normalizedNameOf(field)));
   }
 
+  static FieldValueTypeInformation fieldValueTypeInformationFor(SdkField<?> 
sdkField) {
+    TypeDescriptor<?> type = 
TypeDescriptor.of(sdkField.marshallingType().getTargetClass());
+    return FieldValueTypeInformation.builder()
+        .setName(normalizedNameOf(sdkField))
+        .setType(type)
+        .setRawType(sdkField.marshallingType().getClass())
+        
.setElementType(FieldValueTypeInformation.getIterableComponentType(type))
+        .setMapKeyType(FieldValueTypeInformation.getMapKeyType(type))
+        .setMapValueType(FieldValueTypeInformation.getMapValueType(type))
+        .setOneOfTypes(Collections.emptyMap())
+        .setNullable(true)
+        .build();
+  }
+
   private static Schema schemaFor(List<SdkField<?>> fields, Set<Class<?>> 
seen) {
     Schema.Builder builder = Schema.builder();
     for (SdkField<?> sdkField : fields) {

Reply via email to