This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fea5f797e9 Support unknown repeated STRUCTs (#31447)
1fea5f797e9 is described below

commit 1fea5f797e99f2651fd9b2fe147ce93a25f8a60d
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Jun 4 18:09:04 2024 -0400

    Support unknown repeated STRUCTs (#31447)
    
    * cleanup TableRow of unkonwns for structs
    
    * set nested unknown value as an arraylist if it's repeated
    
    * spotless
    
    * add another test
---
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 35 +++++++---
 .../bigquery/TableRowToStorageApiProtoTest.java    | 77 ++++++++++++++++++++--
 2 files changed, 96 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 07457e72050..c1f452ba93f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.format.DateTimeParseException;
 import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -484,11 +485,11 @@ public class TableRowToStorageApiProto {
       throws SchemaConversionException {
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
     for (final Map.Entry<String, Object> entry : map.entrySet()) {
-      @Nullable
-      FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(entry.getKey().toLowerCase());
+      String key = entry.getKey().toLowerCase();
+      @Nullable FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(key);
       if (fieldDescriptor == null) {
         if (unknownFields != null) {
-          unknownFields.set(entry.getKey().toLowerCase(), entry.getValue());
+          unknownFields.set(key, entry.getValue());
         }
         if (ignoreUnknownValues) {
           continue;
@@ -505,12 +506,19 @@ public class TableRowToStorageApiProto {
           schemaInformation.getSchemaForField(entry.getKey());
       try {
         Supplier<@Nullable TableRow> getNestedUnknown =
-            () ->
-                (unknownFields == null)
-                    ? null
-                    : (TableRow)
-                        unknownFields.computeIfAbsent(
-                            entry.getKey().toLowerCase(), k -> new TableRow());
+            () -> {
+              if (unknownFields == null) {
+                return null;
+              }
+              TableRow nestedUnknown = new TableRow();
+              if (fieldDescriptor.isRepeated()) {
+                ((List<TableRow>)
+                        (unknownFields.computeIfAbsent(key, k -> new 
ArrayList<TableRow>())))
+                    .add(nestedUnknown);
+                return nestedUnknown;
+              }
+              return (TableRow) unknownFields.computeIfAbsent(key, k -> 
nestedUnknown);
+            };
 
         @Nullable
         Object value =
@@ -524,6 +532,15 @@ public class TableRowToStorageApiProto {
         if (value != null) {
           builder.setField(fieldDescriptor, value);
         }
+        // For STRUCT fields, we add a placeholder to unknownFields using the 
getNestedUnknown
+        // supplier (in case we encounter unknown nested fields). If the 
placeholder comes out
+        // to be empty, we should clean it up
+        if 
(fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT)
+            && unknownFields != null
+            && unknownFields.get(key) instanceof Map
+            && ((Map<?, ?>) unknownFields.get(key)).isEmpty()) {
+          unknownFields.remove(key);
+        }
       } catch (Exception e) {
         throw new SchemaDoesntMatchException(
             "Problem converting field "
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 847ede9df36..6a98dad55a6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -40,6 +40,8 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -1577,17 +1579,21 @@ public class TableRowToStorageApiProtoTest {
 
   @Test
   public void testIgnoreUnknownNestedField() throws Exception {
+    TableRow rowNoFWithUnknowns = new TableRow();
+    rowNoFWithUnknowns.putAll(BASE_TABLE_ROW_NO_F);
+    rowNoFWithUnknowns.set("unknown", "foobar");
+    TableRow rowWithFWithUnknowns = new TableRow();
+    List<TableCell> cellsWithUnknowns = 
Lists.newArrayList(BASE_TABLE_ROW.getF());
+    cellsWithUnknowns.add(new TableCell().setV("foobar"));
+    rowWithFWithUnknowns.setF(cellsWithUnknowns);
+    // Nested records with no unknowns should not show up
     TableRow rowNoF = new TableRow();
     rowNoF.putAll(BASE_TABLE_ROW_NO_F);
-    rowNoF.set("unknown", "foobar");
-    TableRow rowWithF = new TableRow();
-    List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
-    cells.add(new TableCell().setV("foobar"));
-    rowWithF.setF(cells);
     TableRow topRow =
         new TableRow()
-            .set("nestedValueNoF1", rowNoF)
-            .set("nestedValue1", rowWithF)
+            .set("nestedValueNoF1", rowNoFWithUnknowns)
+            .set("nestedValue1", rowWithFWithUnknowns)
+            .set("nestedValueNoF2", rowNoF)
             .set("unknowntop", "foobar");
 
     Descriptor descriptor =
@@ -1608,6 +1614,63 @@ public class TableRowToStorageApiProtoTest {
     assertEquals("foobar", ((TableRow) 
unknown.get("nestedvaluenof1")).get("unknown"));
   }
 
+  @Test
+  public void testIgnoreUnknownRepeatedNestedField() throws Exception {
+    TableRow doublyNestedRowNoFWithUnknowns = new TableRow();
+    doublyNestedRowNoFWithUnknowns.putAll(BASE_TABLE_ROW_NO_F);
+    doublyNestedRowNoFWithUnknowns.put("unknown_doubly_nested", 
"foobar_doubly_nested");
+    TableRow nestedRow =
+        new TableRow()
+            .set("nested_struct", doublyNestedRowNoFWithUnknowns)
+            .set("unknown_repeated_struct", "foobar_repeated_struct");
+    TableRow repeatedRow =
+        new TableRow()
+            .set("repeated_struct", Collections.singletonList(nestedRow))
+            .set("unknown_top", "foobar_top");
+
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                Arrays.asList(
+                    new TableFieldSchema()
+                        .setName("repeated_struct")
+                        .setType("STRUCT")
+                        .setMode("REPEATED")
+                        .setFields(
+                            Arrays.asList(
+                                new TableFieldSchema()
+                                    .setName("nested_struct")
+                                    .setType("STRUCT")
+                                    
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields())))));
+
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true, 
false);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema);
+
+    TableRow unknown = new TableRow();
+    TableRowToStorageApiProto.messageFromTableRow(
+        schemaInformation, descriptor, repeatedRow, true, false, unknown, 
null, -1);
+    System.out.println(unknown);
+    // unkown at top level
+    assertEquals(2, unknown.size());
+    assertEquals("foobar_top", unknown.get("unknown_top"));
+
+    // unknown in a repeated struct
+    List<TableRow> unknownRepeatedStruct = ((List<TableRow>) 
unknown.get("repeated_struct"));
+    System.out.println(unknownRepeatedStruct.get(0));
+    assertEquals(1, unknownRepeatedStruct.size());
+    assertEquals(2, unknownRepeatedStruct.get(0).size());
+    assertEquals(
+        "foobar_repeated_struct", 
unknownRepeatedStruct.get(0).get("unknown_repeated_struct"));
+
+    // unknown in a double nested repeated struct
+    TableRow unknownDoublyNestedStruct =
+        (TableRow) unknownRepeatedStruct.get(0).get("nested_struct");
+    assertEquals(1, unknownDoublyNestedStruct.size());
+    assertEquals("foobar_doubly_nested", 
unknownDoublyNestedStruct.get("unknown_doubly_nested"));
+  }
+
   @Test
   public void testCdcFields() throws Exception {
     TableRow tableRow =

Reply via email to