This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1fea5f797e9 Support unknown repeated STRUCTs (#31447)
1fea5f797e9 is described below
commit 1fea5f797e99f2651fd9b2fe147ce93a25f8a60d
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Jun 4 18:09:04 2024 -0400
Support unknown repeated STRUCTs (#31447)
* cleanup TableRow of unkonwns for structs
* set nested unknown value as an arraylist if it's repeated
* spotless
* add another test
---
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 35 +++++++---
.../bigquery/TableRowToStorageApiProtoTest.java | 77 ++++++++++++++++++++--
2 files changed, 96 insertions(+), 16 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 07457e72050..c1f452ba93f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.util.AbstractMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -484,11 +485,11 @@ public class TableRowToStorageApiProto {
throws SchemaConversionException {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (final Map.Entry<String, Object> entry : map.entrySet()) {
- @Nullable
- FieldDescriptor fieldDescriptor =
descriptor.findFieldByName(entry.getKey().toLowerCase());
+ String key = entry.getKey().toLowerCase();
+ @Nullable FieldDescriptor fieldDescriptor =
descriptor.findFieldByName(key);
if (fieldDescriptor == null) {
if (unknownFields != null) {
- unknownFields.set(entry.getKey().toLowerCase(), entry.getValue());
+ unknownFields.set(key, entry.getValue());
}
if (ignoreUnknownValues) {
continue;
@@ -505,12 +506,19 @@ public class TableRowToStorageApiProto {
schemaInformation.getSchemaForField(entry.getKey());
try {
Supplier<@Nullable TableRow> getNestedUnknown =
- () ->
- (unknownFields == null)
- ? null
- : (TableRow)
- unknownFields.computeIfAbsent(
- entry.getKey().toLowerCase(), k -> new TableRow());
+ () -> {
+ if (unknownFields == null) {
+ return null;
+ }
+ TableRow nestedUnknown = new TableRow();
+ if (fieldDescriptor.isRepeated()) {
+ ((List<TableRow>)
+ (unknownFields.computeIfAbsent(key, k -> new
ArrayList<TableRow>())))
+ .add(nestedUnknown);
+ return nestedUnknown;
+ }
+ return (TableRow) unknownFields.computeIfAbsent(key, k ->
nestedUnknown);
+ };
@Nullable
Object value =
@@ -524,6 +532,15 @@ public class TableRowToStorageApiProto {
if (value != null) {
builder.setField(fieldDescriptor, value);
}
+ // For STRUCT fields, we add a placeholder to unknownFields using the
getNestedUnknown
+ // supplier (in case we encounter unknown nested fields). If the
placeholder comes out
+ // to be empty, we should clean it up
+ if
(fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT)
+ && unknownFields != null
+ && unknownFields.get(key) instanceof Map
+ && ((Map<?, ?>) unknownFields.get(key)).isEmpty()) {
+ unknownFields.remove(key);
+ }
} catch (Exception e) {
throw new SchemaDoesntMatchException(
"Problem converting field "
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 847ede9df36..6a98dad55a6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -40,6 +40,8 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -1577,17 +1579,21 @@ public class TableRowToStorageApiProtoTest {
@Test
public void testIgnoreUnknownNestedField() throws Exception {
+ TableRow rowNoFWithUnknowns = new TableRow();
+ rowNoFWithUnknowns.putAll(BASE_TABLE_ROW_NO_F);
+ rowNoFWithUnknowns.set("unknown", "foobar");
+ TableRow rowWithFWithUnknowns = new TableRow();
+ List<TableCell> cellsWithUnknowns =
Lists.newArrayList(BASE_TABLE_ROW.getF());
+ cellsWithUnknowns.add(new TableCell().setV("foobar"));
+ rowWithFWithUnknowns.setF(cellsWithUnknowns);
+ // Nested records with no unknowns should not show up
TableRow rowNoF = new TableRow();
rowNoF.putAll(BASE_TABLE_ROW_NO_F);
- rowNoF.set("unknown", "foobar");
- TableRow rowWithF = new TableRow();
- List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
- cells.add(new TableCell().setV("foobar"));
- rowWithF.setF(cells);
TableRow topRow =
new TableRow()
- .set("nestedValueNoF1", rowNoF)
- .set("nestedValue1", rowWithF)
+ .set("nestedValueNoF1", rowNoFWithUnknowns)
+ .set("nestedValue1", rowWithFWithUnknowns)
+ .set("nestedValueNoF2", rowNoF)
.set("unknowntop", "foobar");
Descriptor descriptor =
@@ -1608,6 +1614,63 @@ public class TableRowToStorageApiProtoTest {
assertEquals("foobar", ((TableRow)
unknown.get("nestedvaluenof1")).get("unknown"));
}
+ @Test
+ public void testIgnoreUnknownRepeatedNestedField() throws Exception {
+ TableRow doublyNestedRowNoFWithUnknowns = new TableRow();
+ doublyNestedRowNoFWithUnknowns.putAll(BASE_TABLE_ROW_NO_F);
+ doublyNestedRowNoFWithUnknowns.put("unknown_doubly_nested",
"foobar_doubly_nested");
+ TableRow nestedRow =
+ new TableRow()
+ .set("nested_struct", doublyNestedRowNoFWithUnknowns)
+ .set("unknown_repeated_struct", "foobar_repeated_struct");
+ TableRow repeatedRow =
+ new TableRow()
+ .set("repeated_struct", Collections.singletonList(nestedRow))
+ .set("unknown_top", "foobar_top");
+
+ TableSchema schema =
+ new TableSchema()
+ .setFields(
+ Arrays.asList(
+ new TableFieldSchema()
+ .setName("repeated_struct")
+ .setType("STRUCT")
+ .setMode("REPEATED")
+ .setFields(
+ Arrays.asList(
+ new TableFieldSchema()
+ .setName("nested_struct")
+ .setType("STRUCT")
+
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields())))));
+
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true,
false);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema);
+
+ TableRow unknown = new TableRow();
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, repeatedRow, true, false, unknown,
null, -1);
+ System.out.println(unknown);
+ // unkown at top level
+ assertEquals(2, unknown.size());
+ assertEquals("foobar_top", unknown.get("unknown_top"));
+
+ // unknown in a repeated struct
+ List<TableRow> unknownRepeatedStruct = ((List<TableRow>)
unknown.get("repeated_struct"));
+ System.out.println(unknownRepeatedStruct.get(0));
+ assertEquals(1, unknownRepeatedStruct.size());
+ assertEquals(2, unknownRepeatedStruct.get(0).size());
+ assertEquals(
+ "foobar_repeated_struct",
unknownRepeatedStruct.get(0).get("unknown_repeated_struct"));
+
+ // unknown in a double nested repeated struct
+ TableRow unknownDoublyNestedStruct =
+ (TableRow) unknownRepeatedStruct.get(0).get("nested_struct");
+ assertEquals(1, unknownDoublyNestedStruct.size());
+ assertEquals("foobar_doubly_nested",
unknownDoublyNestedStruct.get("unknown_doubly_nested"));
+ }
+
@Test
public void testCdcFields() throws Exception {
TableRow tableRow =