gemini-code-assist[bot] commented on code in PR #36425:
URL: https://github.com/apache/beam/pull/36425#discussion_r2423096861


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -1128,68 +1216,376 @@ private static long toEpochMicros(Instant timestamp) {
 
   @VisibleForTesting
   public static TableRow tableRowFromMessage(
-      Message message, boolean includeCdcColumns, Predicate<String> 
includeField) {
-    return tableRowFromMessage(message, includeCdcColumns, includeField, "");
+      SchemaInformation schemaInformation,
+      Message message,
+      boolean includeCdcColumns,
+      Predicate<String> includeField) {
+    return tableRowFromMessage(schemaInformation, message, includeCdcColumns, 
includeField, "");
   }
 
   public static TableRow tableRowFromMessage(
+      SchemaInformation schemaInformation,
+      Message message,
+      boolean includeCdcColumns,
+      Predicate<String> includeField,
+      String namePrefix) {
+    // We first try to create a map-style TableRow for backwards compatibility 
with existing usage.
+    // However this will
+    // fail if there is a column name "f". If it fails, we then instead create 
a list-style
+    // TableRow.
+    Optional<TableRow> tableRow =
+        tableRowFromMessageNoF(
+            schemaInformation, message, includeCdcColumns, includeField, 
namePrefix);
+    return tableRow.orElseGet(
+        () ->
+            tableRowFromMessageUseSetF(
+                schemaInformation, message, includeCdcColumns, includeField, 
""));
+  }
+
+  private static Optional<TableRow> tableRowFromMessageNoF(
+      SchemaInformation schemaInformation,
       Message message,
       boolean includeCdcColumns,
       Predicate<String> includeField,
       String namePrefix) {
-    // TODO: Would be more correct to generate TableRows using setF.
     TableRow tableRow = new TableRow();
     for (Map.Entry<FieldDescriptor, Object> field : 
message.getAllFields().entrySet()) {
       StringBuilder fullName = new StringBuilder();
       FieldDescriptor fieldDescriptor = field.getKey();
       String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+      if ("f".equals(fieldName)) {
+        // TableRow.put won't work as expected if the fields in named "f." 
Fail the call, and force
+        // a retry using
+        // the setF codepath.
+        return Optional.empty();
+      }
       fullName = fullName.append(namePrefix).append(fieldName);
       Object fieldValue = field.getValue();
       if ((includeCdcColumns || 
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
           && includeField.test(fieldName)) {
-        tableRow.put(
-            fieldName,
+        SchemaInformation fieldSchemaInformation = 
schemaInformation.getSchemaForField(fieldName);
+        Object convertedFieldValue =
             jsonValueFromMessageValue(
-                fieldDescriptor, fieldValue, true, includeField, 
fullName.append(".").toString()));
+                fieldSchemaInformation,
+                fieldDescriptor,
+                fieldValue,
+                true,
+                includeField,
+                fullName.append(".").toString(),
+                false);
+        if (convertedFieldValue instanceof Optional) {
+          Optional<?> optional = (Optional<?>) convertedFieldValue;
+          if (!optional.isPresent()) {
+            // Some nested message had a field named "f." Fail.
+            return Optional.empty();
+          } else {
+            convertedFieldValue = optional.get();
+          }
+        }
+        tableRow.put(fieldName, convertedFieldValue);
+      }
+    }
+    return Optional.of(tableRow);
+  }
+
+  public static TableRow tableRowFromMessageUseSetF(
+      SchemaInformation schemaInformation,
+      Message message,
+      boolean includeCdcColumns,
+      Predicate<String> includeField,
+      String namePrefix) {
+    List<TableCell> tableCells =
+        
Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size());
+
+    for (FieldDescriptor fieldDescriptor : 
message.getDescriptorForType().getFields()) {
+      TableCell tableCell = new TableCell();
+      boolean isPresent =
+          (fieldDescriptor.isRepeated() && 
message.getRepeatedFieldCount(fieldDescriptor) > 0)
+              || (!fieldDescriptor.isRepeated() && 
message.hasField(fieldDescriptor));
+      if (isPresent) {
+        StringBuilder fullName = new StringBuilder();
+        String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+        fullName = fullName.append(namePrefix).append(fieldName);
+        if ((includeCdcColumns || 
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
+            && includeField.test(fieldName)) {
+          SchemaInformation fieldSchemaInformation = 
schemaInformation.getSchemaForField(fieldName);
+          Object fieldValue = message.getField(fieldDescriptor);
+          Object converted =
+              jsonValueFromMessageValue(
+                  fieldSchemaInformation,
+                  fieldDescriptor,
+                  fieldValue,
+                  true,
+                  includeField,
+                  fullName.append(".").toString(),
+                  true);
+          tableCell.setV(converted);
+        }
       }
+      tableCells.add(tableCell);
     }
+
+    TableRow tableRow = new TableRow();
+    tableRow.setF(tableCells);
+
     return tableRow;
   }
 
+  // Our process for generating descriptors modifies the names of nested 
descriptors for wrapper
+  // types, so we record them here.
+  private static String FLOAT_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_FloatValue";
+  private static String DOUBLE_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_DoubleValue";
+  private static String BOOL_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_BoolValue";
+  private static String INT32_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_Int32Value";
+  private static String INT64_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_Int64Value";
+  private static String UINT32_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_UInt32Value";
+  private static String UINT64_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_UInt64Value";
+  private static String BYTES_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_BytesValue";
+  private static String TIMESTAMP_VALUE_DESCRIPTOR_NAME = 
"google_protobuf_Timestamp";
+
+  // Translate a proto message value into a json value. If useSetF==false, 
this will fail with
+  // Optional.empty() if
+  // any fields named "f" are found (due to restrictions on the TableRow 
class). In that case, the
+  // top level will retry
+  // with useSetF==true. We fallback this way in order to maintain backwards 
compatibility with
+  // existing users.
   public static Object jsonValueFromMessageValue(
+      SchemaInformation schemaInformation,
       FieldDescriptor fieldDescriptor,
       Object fieldValue,
       boolean expandRepeated,
       Predicate<String> includeField,
-      String prefix) {
+      String prefix,
+      boolean useSetF) {
     if (expandRepeated && fieldDescriptor.isRepeated()) {
       List<Object> valueList = (List<Object>) fieldValue;
-      return valueList.stream()
-          .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, 
includeField, prefix))
-          .collect(toList());
+      List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
+      for (Object value : valueList) {
+        Object translatedValue =
+            jsonValueFromMessageValue(
+                schemaInformation, fieldDescriptor, value, false, 
includeField, prefix, useSetF);
+        if (!useSetF && translatedValue instanceof Optional) {
+          Optional<?> optional = (Optional<?>) translatedValue;
+          if (!optional.isPresent()) {
+            // A nested element contained an "f" column. Fail the call.
+            return Optional.empty();
+          }
+          translatedValue = optional.get();
+        }
+        expanded.add(translatedValue);
+      }
+      return expanded;
     }
 
-    switch (fieldDescriptor.getType()) {
-      case GROUP:
-      case MESSAGE:
-        return tableRowFromMessage((Message) fieldValue, false, includeField, 
prefix);
-      case BYTES:
-        return BaseEncoding.base64().encode(((ByteString) 
fieldValue).toByteArray());
-      case ENUM:
-        throw new RuntimeException("Enumerations not supported");
-      case INT32:
-      case FLOAT:
-      case BOOL:
+    // BigQueryIO supports direct proto writes - i.e. we allow the user to 
pass in their own proto
+    // and skip our
+    // conversion layer, as long as the proto conforms to the types supported 
by the BigQuery
+    // Storage Write API.
+    // For many schema types, the Storage Write API supports different proto 
field types (often with
+    // different
+    // encodings), so the mapping of schema type -> proto type is one to many. 
To read the data out
+    // of the proto,
+    // we need to examine both the schema type and the proto field type.
+    switch (schemaInformation.getType()) {
       case DOUBLE:
+        switch (fieldDescriptor.getType()) {
+          case FLOAT:
+          case DOUBLE:
+          case STRING:
+            return 
BigDecimal.valueOf(Double.parseDouble(fieldValue.toString()))
+                .stripTrailingZeros()
+                .toString();
+          case MESSAGE:
+            // Handle the various number wrapper types.
+            Message doubleMessage = (Message) fieldValue;
+            if 
(fieldDescriptor.getMessageType().getName().equals(FLOAT_VALUE_DESCRIPTOR_NAME))
 {
+              float floatValue =
+                  (float)
+                      doubleMessage.getField(
+                          
doubleMessage.getDescriptorForType().findFieldByName("value"));
+
+              return new 
BigDecimal(Float.toString(floatValue)).stripTrailingZeros().toString();
+            } else if (fieldDescriptor
+                .getMessageType()
+                .getName()
+                .equals(DOUBLE_VALUE_DESCRIPTOR_NAME)) {
+              double doubleValue =
+                  (double)
+                      doubleMessage.getField(
+                          
doubleMessage.getDescriptorForType().findFieldByName("value"));
+              return 
BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
+            } else {
+              throw new RuntimeException(
+                  "Not implemented yet "
+                      + fieldDescriptor.getMessageType().getName()
+                      + " PARTIAL NAME "
+                      + fieldDescriptor.getMessageType().getName()
+                      + " FIELD NAME "
+                      + prefix
+                      + " CLASS TYPE "
+                      + fieldValue.getClass());
+            }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The exception message here seems to contain debugging information ("PARTIAL 
NAME", "FIELD NAME", "CLASS TYPE"). It would be better to provide a cleaner, 
more user-friendly message. Also, consider using 
`UnsupportedOperationException` instead of a generic `RuntimeException` for 
cases that are not implemented. This applies to similar `throw new 
RuntimeException("Not implemented...")` statements in this method for other 
types like `BOOL`, `INT64`, `BYTES`, and `TIMESTAMP`.
   
   ```java
   throw new UnsupportedOperationException(
       "Converting BigQuery 'DOUBLE' from a protobuf message of type '"
           + fieldDescriptor.getMessageType().getName()
           + "' is not supported. Field: "
           + prefix);
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -3663,13 +3675,11 @@ public void testStorageApiErrorsWriteTableRows() throws 
Exception {
                     .via(BigQueryStorageApiInsertError::getRow));
     PCollection<TableRow> successfulRows = 
result.getSuccessfulStorageApiInserts();
 
-    List<TableRow> expectedFailedRows =
-        
badRows.stream().map(formatRecordOnFailureFunction::apply).collect(Collectors.toList());
+    List<TableRow> expectedFailedRows = Lists.newArrayList(badRows);
     expectedFailedRows.addAll(
-        goodRows.stream()
-            .filter(shouldFailRow::apply)
-            .map(formatRecordOnFailureFunction::apply)
-            .collect(Collectors.toList()));
+        
goodRows.stream().filter(shouldFailRow::apply).collect(Collectors.toList()));
+
+    deadRows.apply(ParDo.of(new PrintingDoFn()));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This line applies a `PrintingDoFn` that prints failed rows to `System.err`. 
This seems to be a debugging artifact left in the test. Test verification 
should be done via assertions (like `PAssert`), and printing to standard error 
should be avoided in tests as it can pollute logs. This line should be removed.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -3684,6 +3694,13 @@ public void testStorageApiErrorsWriteTableRows() throws 
Exception {
                 Iterables.filter(goodRows, r -> !shouldFailRow.apply(r)), 
TableRow.class)));
   }
 
+  private static class PrintingDoFn extends DoFn<TableRow, Void> {
+    @ProcessElement
+    public void processElement(@Element TableRow element) {
+      System.err.println("FAILED " + element);
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `PrintingDoFn` class appears to be a debugging utility that was left in 
the test code. It prints to `System.err`, which is not ideal for automated 
tests. This class should be removed along with its usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to