This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d204f6038 Fix inconsistent data type in GenericRecord and AvroSchema 
for AvroWriter (#36839)
87d204f6038 is described below

commit 87d204f60383573287f2c767a376d4206556b255
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 18 12:51:26 2025 -0500

    Fix inconsistent data type in GenericRecord and AvroSchema for AvroWriter 
(#36839)
    
    * Fix inconsist data type in GenericRecord and AvroSchema for AvroWriter
    
    * clean code
---
 .../extensions/avro/schemas/utils/AvroUtils.java   | 16 +++++++++++++++
 .../providers/PortableBigQueryDestinations.java    |  2 +-
 ...gQueryFileLoadsSchemaTransformProviderTest.java | 23 ++++++++++++++++++----
 3 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 460bfaec4a3..1a8cac7ffb6 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -97,6 +98,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -1214,6 +1216,15 @@ public class AvroUtils {
     return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : 
baseType;
   }
 
+  private static final Map<org.apache.avro.Schema, Function<Number, ? extends 
Number>>
+      NUMERIC_CONVERTERS =
+          ImmutableMap.of(
+              org.apache.avro.Schema.create(Type.INT), Number::intValue,
+              org.apache.avro.Schema.create(Type.LONG), Number::longValue,
+              org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue,
+              org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue);
+
+  /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */
   private static @Nullable Object genericFromBeamField(
       FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object 
value) {
     TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
@@ -1230,6 +1241,11 @@ public class AvroUtils {
       return value;
     }
 
+    if (NUMERIC_CONVERTERS.containsKey(typeWithNullability.type)) {
+      return NUMERIC_CONVERTERS.get(typeWithNullability.type).apply((Number) 
value);
+    }
+
+    // TODO: should we use Avro Schema as the source-of-truth in general?
     switch (fieldType.getTypeName()) {
       case BYTE:
       case INT16:
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
index 42eee4f3f03..c927cec3473 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
@@ -122,7 +122,7 @@ public class PortableBigQueryDestinations extends 
DynamicDestinations<Row, Strin
         row = checkStateNotNull(row.getRow(RECORD));
       }
       Row filtered = rowFilter.filter(row);
-      return AvroUtils.toGenericRecord(filtered);
+      return AvroUtils.toGenericRecord(filtered, request.getSchema());
     };
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
index 168febea9d8..7ba420e5b8c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
@@ -70,13 +70,28 @@ public class BigQueryFileLoadsSchemaTransformProviderTest {
       new 
TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
 
   private static final Schema SCHEMA =
-      Schema.of(Field.of("name", FieldType.STRING), Field.of("number", 
FieldType.INT64));
+      Schema.of(
+          Field.of("name", FieldType.STRING),
+          Field.of("number", FieldType.INT64),
+          Field.of("age", FieldType.INT32).withNullable(true));
 
   private static final List<Row> ROWS =
       Arrays.asList(
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 1L).build(),
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"b").withFieldValue("number", 2L).build(),
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"c").withFieldValue("number", 3L).build());
+          Row.withSchema(SCHEMA)
+              .withFieldValue("name", "a")
+              .withFieldValue("number", 1L)
+              .withFieldValue("age", 10)
+              .build(),
+          Row.withSchema(SCHEMA)
+              .withFieldValue("name", "b")
+              .withFieldValue("number", 2L)
+              .withFieldValue("age", 20)
+              .build(),
+          Row.withSchema(SCHEMA)
+              .withFieldValue("name", "c")
+              .withFieldValue("number", 3L)
+              .withFieldValue("age", null)
+              .build());
 
   private static final BigQueryOptions OPTIONS =
       TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);

Reply via email to