This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0f2e1963987 Replace getElementType with getValueType for MAP in
AvroGenericRecord… (#31653)
0f2e1963987 is described below
commit 0f2e1963987f1fbb3329016d8c862639ed4fbe43
Author: codertimu <[email protected]>
AuthorDate: Sun Jun 23 04:44:13 2024 +0200
Replace getElementType with getValueType for MAP in AvroGenericRecord…
(#31653)
---
.../AvroGenericRecordToStorageApiProto.java | 10 ++--
.../AvroGenericRecordToStorageApiProtoTest.java | 62 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 519f9391db6..27e75176e57 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -247,17 +247,15 @@ public class AvroGenericRecordToStorageApiProto {
break;
case MAP:
Schema keyType = Schema.create(Schema.Type.STRING);
- Schema valueType =
TypeWithNullability.create(schema.getElementType()).getType();
+ Schema valueType = Schema.create(schema.getValueType().getType());
if (valueType == null) {
throw new RuntimeException("Unexpected null element type!");
}
TableFieldSchema keyFieldSchema =
- fieldDescriptorFromAvroField(
- new Schema.Field("key", keyType, "key of the map entry",
Schema.Field.NULL_VALUE));
+ fieldDescriptorFromAvroField(new Schema.Field("key", keyType, "key
of the map entry"));
TableFieldSchema valueFieldSchema =
fieldDescriptorFromAvroField(
- new Schema.Field(
- "value", valueType, "value of the map entry",
Schema.Field.NULL_VALUE));
+ new Schema.Field("value", valueType, "value of the map
entry"));
builder =
builder
.setType(TableFieldSchema.Type.STRUCT)
@@ -346,7 +344,7 @@ public class AvroGenericRecordToStorageApiProto {
return toProtoValue(fieldDescriptor, type.getType(), value);
case MAP:
Map<CharSequence, Object> map = (Map<CharSequence, Object>) value;
- Schema valueType =
TypeWithNullability.create(avroSchema.getElementType()).getType();
+ Schema valueType = Schema.create(avroSchema.getValueType().getType());
if (valueType == null) {
throw new RuntimeException("Unexpected null element type!");
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index 3a4dcb02ebd..6f31831db13 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -32,6 +32,8 @@ import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -266,6 +268,25 @@ public class AvroGenericRecordToStorageApiProtoTest {
.noDefault()
.endRecord();
+ private static final Schema SCHEMA_WITH_MAP;
+
+ static {
+ SCHEMA_WITH_MAP =
+ SchemaBuilder.record("TestMap")
+ .fields()
+ .name("nested")
+ .type()
+ .optional()
+ .type(BASE_SCHEMA)
+ .name("aMap")
+ .type()
+ .map()
+ .values()
+ .stringType()
+ .mapDefault(ImmutableMap.<String, Object>builder().put("key1",
"value1").build())
+ .endRecord();
+ }
+
private static GenericRecord baseRecord;
private static GenericRecord logicalTypesRecord;
private static Map<String, Object> baseProtoExpectedFields;
@@ -505,4 +526,45 @@ public class AvroGenericRecordToStorageApiProtoTest {
assertEquals(7, msg.getAllFields().size());
assertBaseRecord(msg, logicalTypesProtoExpectedFields);
}
+
+ @Test
+ public void testMessageFromGenericRecordWithMap() throws Exception {
+ // Create a GenericRecord with a map field
+ Map<String, String> mapData = new HashMap<>();
+ mapData.put("key1", "value1");
+ mapData.put("key2", "value2");
+ GenericRecord recordWithMap =
+ new GenericRecordBuilder(SCHEMA_WITH_MAP)
+ .set("nested", baseRecord)
+ .set("aMap", mapData)
+ .build();
+
+ Descriptors.Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(
+
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(SCHEMA_WITH_MAP),
+ true,
+ false);
+ DynamicMessage msg =
+ AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
+ descriptor, recordWithMap, null, -1);
+
+ assertEquals(2, msg.getAllFields().size());
+
+ Map<String, Descriptors.FieldDescriptor> fieldDescriptors =
+ descriptor.getFields().stream()
+ .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName,
Functions.identity()));
+ DynamicMessage nestedMsg = (DynamicMessage)
msg.getField(fieldDescriptors.get("nested"));
+ assertBaseRecord(nestedMsg, baseProtoExpectedFields);
+
+ // Assert the map field
+ List<DynamicMessage> list = (List<DynamicMessage>)
msg.getField(fieldDescriptors.get("amap"));
+ // Convert the list of DynamicMessages back to a map
+ Map<String, String> actualMap = new HashMap<>();
+ for (DynamicMessage entry : list) {
+ String key = (String)
entry.getField(entry.getDescriptorForType().findFieldByName("key"));
+ String value = (String)
entry.getField(entry.getDescriptorForType().findFieldByName("value"));
+ actualMap.put(key, value);
+ }
+ assertEquals(mapData, actualMap);
+ }
}