This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 87d204f6038 Fix inconsistent data type in GenericRecord and AvroSchema
for AvroWriter (#36839)
87d204f6038 is described below
commit 87d204f60383573287f2c767a376d4206556b255
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 18 12:51:26 2025 -0500
Fix inconsistent data type in GenericRecord and AvroSchema for AvroWriter
(#36839)
* Fix inconsist data type in GenericRecord and AvroSchema for AvroWriter
* clean code
---
.../extensions/avro/schemas/utils/AvroUtils.java | 16 +++++++++++++++
.../providers/PortableBigQueryDestinations.java | 2 +-
...gQueryFileLoadsSchemaTransformProviderTest.java | 23 ++++++++++++++++++----
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 460bfaec4a3..1a8cac7ffb6 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -97,6 +98,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -1214,6 +1216,15 @@ public class AvroUtils {
return fieldType.getNullable() ? ReflectData.makeNullable(baseType) :
baseType;
}
+ private static final Map<org.apache.avro.Schema, Function<Number, ? extends
Number>>
+ NUMERIC_CONVERTERS =
+ ImmutableMap.of(
+ org.apache.avro.Schema.create(Type.INT), Number::intValue,
+ org.apache.avro.Schema.create(Type.LONG), Number::longValue,
+ org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue,
+ org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue);
+
+ /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */
private static @Nullable Object genericFromBeamField(
FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object
value) {
TypeWithNullability typeWithNullability = new
TypeWithNullability(avroSchema);
@@ -1230,6 +1241,11 @@ public class AvroUtils {
return value;
}
+ if (NUMERIC_CONVERTERS.containsKey(typeWithNullability.type)) {
+ return NUMERIC_CONVERTERS.get(typeWithNullability.type).apply((Number)
value);
+ }
+
+ // TODO: should we use Avro Schema as the source-of-truth in general?
switch (fieldType.getTypeName()) {
case BYTE:
case INT16:
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
index 42eee4f3f03..c927cec3473 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
@@ -122,7 +122,7 @@ public class PortableBigQueryDestinations extends
DynamicDestinations<Row, Strin
row = checkStateNotNull(row.getRow(RECORD));
}
Row filtered = rowFilter.filter(row);
- return AvroUtils.toGenericRecord(filtered);
+ return AvroUtils.toGenericRecord(filtered, request.getSchema());
};
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
index 168febea9d8..7ba420e5b8c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
@@ -70,13 +70,28 @@ public class BigQueryFileLoadsSchemaTransformProviderTest {
new
TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
private static final Schema SCHEMA =
- Schema.of(Field.of("name", FieldType.STRING), Field.of("number",
FieldType.INT64));
+ Schema.of(
+ Field.of("name", FieldType.STRING),
+ Field.of("number", FieldType.INT64),
+ Field.of("age", FieldType.INT32).withNullable(true));
private static final List<Row> ROWS =
Arrays.asList(
- Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 1L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 2L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name",
"c").withFieldValue("number", 3L).build());
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "a")
+ .withFieldValue("number", 1L)
+ .withFieldValue("age", 10)
+ .build(),
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "b")
+ .withFieldValue("number", 2L)
+ .withFieldValue("age", 20)
+ .build(),
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "c")
+ .withFieldValue("number", 3L)
+ .withFieldValue("age", null)
+ .build());
private static final BigQueryOptions OPTIONS =
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);