This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 911c525348b Fix nullable array issue 31674 in
AvroGenericRecordToStorageApiProto (#31675)
911c525348b is described below
commit 911c525348b81afd48a414018ba579c3b78e3262
Author: codertimu <[email protected]>
AuthorDate: Tue Jun 25 22:30:18 2024 +0200
Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto
(#31675)
* Handle nullable array in fieldDescriptorFromAvroField function.
* rename test method.
* apply spotless
* Add test case with null array. Allow null value for union with a null
type.
---------
Co-authored-by: Ahmet Timucin <[email protected]>
---
.../AvroGenericRecordToStorageApiProto.java | 5 +-
.../AvroGenericRecordToStorageApiProtoTest.java | 99 ++++++++++++++++++----
2 files changed, 86 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 27e75176e57..0b7e17b8909 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -277,6 +277,7 @@ public class AvroGenericRecordToStorageApiProto {
builder =
builder
.setType(unionFieldSchema.getType())
+ .setMode(unionFieldSchema.getMode())
.addAllFields(unionFieldSchema.getFieldsList());
break;
default:
@@ -311,7 +312,9 @@ public class AvroGenericRecordToStorageApiProto {
FieldDescriptor fieldDescriptor, Schema.Field avroField, String name,
GenericRecord record) {
@Nullable Object value = record.get(name);
if (value == null) {
- if (fieldDescriptor.isOptional()) {
+ if (fieldDescriptor.isOptional()
+ || avroField.schema().getTypes().stream()
+ .anyMatch(t -> t.getType() == Schema.Type.NULL)) {
return null;
} else {
throw new IllegalArgumentException(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index 6f31831db13..6a59afeed82 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -31,6 +31,7 @@ import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -268,24 +269,32 @@ public class AvroGenericRecordToStorageApiProtoTest {
.noDefault()
.endRecord();
- private static final Schema SCHEMA_WITH_MAP;
+ private static final Schema SCHEMA_WITH_MAP =
+ SchemaBuilder.record("TestMap")
+ .fields()
+ .name("nested")
+ .type()
+ .optional()
+ .type(BASE_SCHEMA)
+ .name("aMap")
+ .type()
+ .map()
+ .values()
+ .stringType()
+ .mapDefault(ImmutableMap.<String, Object>builder().put("key1",
"value1").build())
+ .endRecord();
- static {
- SCHEMA_WITH_MAP =
- SchemaBuilder.record("TestMap")
- .fields()
- .name("nested")
- .type()
- .optional()
- .type(BASE_SCHEMA)
- .name("aMap")
- .type()
- .map()
- .values()
- .stringType()
- .mapDefault(ImmutableMap.<String, Object>builder().put("key1",
"value1").build())
- .endRecord();
- }
+ private static final Schema SCHEMA_WITH_NULLABLE_ARRAY =
+ SchemaBuilder.record("TestNullableArray")
+ .fields()
+ .name("aNullableArray")
+ .type()
+ .nullable()
+ .array()
+ .items()
+ .stringType()
+ .noDefault()
+ .endRecord();
private static GenericRecord baseRecord;
private static GenericRecord logicalTypesRecord;
@@ -567,4 +576,60 @@ public class AvroGenericRecordToStorageApiProtoTest {
}
assertEquals(mapData, actualMap);
}
+
+ @Test
+ public void testMessageFromGenericRecordWithNullableArrayWithNonNullValue()
throws Exception {
+ ImmutableList<String> aList = ImmutableList.of("one", "two", "red",
"blue");
+ GenericRecord recordWithArray =
+ new
GenericRecordBuilder(AvroGenericRecordToStorageApiProtoTest.SCHEMA_WITH_NULLABLE_ARRAY)
+ .set("aNullableArray", aList)
+ .build();
+
+ Descriptors.Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(
+ AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
+ SCHEMA_WITH_NULLABLE_ARRAY),
+ true,
+ false);
+ DynamicMessage msg =
+ AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
+ descriptor, recordWithArray, null, -1);
+
+ assertEquals(1, msg.getAllFields().size());
+
+ Map<String, Descriptors.FieldDescriptor> fieldDescriptors =
+ descriptor.getFields().stream()
+ .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName,
Functions.identity()));
+
+ List<String> list = (List<String>)
msg.getField(fieldDescriptors.get("anullablearray"));
+ assertEquals(aList, list);
+ }
+
+ @Test
+ public void testMessageFromGenericRecordWithNullableArrayWithNullValue()
throws Exception {
+ ImmutableList<String> aNullList = null;
+ GenericRecord recordWithNullArray =
+ new
GenericRecordBuilder(AvroGenericRecordToStorageApiProtoTest.SCHEMA_WITH_NULLABLE_ARRAY)
+ .set("aNullableArray", aNullList)
+ .build();
+
+ Descriptors.Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(
+ AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
+ SCHEMA_WITH_NULLABLE_ARRAY),
+ true,
+ false);
+ DynamicMessage msg =
+ AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
+ descriptor, recordWithNullArray, null, -1);
+
+ assertEquals(0, msg.getAllFields().size());
+
+ Map<String, Descriptors.FieldDescriptor> fieldDescriptors =
+ descriptor.getFields().stream()
+ .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName,
Functions.identity()));
+
+ List<String> list = (List<String>)
msg.getField(fieldDescriptors.get("anullablearray"));
+ assertEquals(Collections.emptyList(), list);
+ }
}