This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ccf9522b52d [BQ]: Update error message for too big of a BQ tablerow 
(#35567)
ccf9522b52d is described below

commit ccf9522b52dc6bebebfa7f10f0ccfc24b9b55ed9
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Jul 21 09:47:10 2025 -0400

    [BQ]: Update error message for too big of a BQ tablerow (#35567)
    
    * improve on existing too big of a table row message and update test
    
    * switch to key and value pairs
    
    * update comments and logs
    
    * update to schema only
    
    * add types for row details
    
    * update to surface the schema data better
    
    * update to validate row schema
    
    * fix Ahmeds' comments
    
    * clean up comments
    
    * oops ran over previous commits
    
    * fix spotless
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 105 ++++++++++++++++++---
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  |  80 +++++++++++++++-
 2 files changed, 172 insertions(+), 13 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 390ffa1aa99..f4303886c7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -109,12 +109,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.beam.fn.harness.logging.QuotaEvent;
 import org.apache.beam.fn.harness.logging.QuotaEvent.QuotaEventCloseable;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
@@ -172,7 +174,12 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
 
   // The approximate maximum payload of rows for an insertAll request.
   // We set it to 9MB, which leaves room for request overhead.
-  private static final Integer MAX_BQ_ROW_PAYLOAD = 9 * 1024 * 1024;
+  private static final Integer MAX_BQ_ROW_PAYLOAD_MB = 9;
+
+  private static final Integer MAX_BQ_ROW_PAYLOAD_BYTES = 
MAX_BQ_ROW_PAYLOAD_MB * 1024 * 1024;
+
+  private static final String MAX_BQ_ROW_PAYLOAD_DESC =
+      String.format("%sMB", MAX_BQ_ROW_PAYLOAD_MB);
 
   // The initial backoff for polling the status of a BigQuery job.
   private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = 
Duration.standardSeconds(1);
@@ -596,6 +603,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
     private final PipelineOptions options;
     private final long maxRowsPerBatch;
     private final long maxRowBatchSize;
+    private final Map<String, TableSchema> tableSchemaCache = new 
ConcurrentHashMap<>();
     // aggregate the total time spent in exponential backoff
     private final Counter throttlingMsecs =
         Metrics.counter(DatasetServiceImpl.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
@@ -1149,22 +1157,53 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           // If this row's encoding by itself is larger than the maximum row 
payload, then it's
           // impossible to insert into BigQuery, and so we send it out through 
the dead-letter
           // queue.
-          if (nextRowSize >= MAX_BQ_ROW_PAYLOAD) {
+          if (nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES) {
             InsertErrors error =
                 new InsertErrors()
                     .setErrors(ImmutableList.of(new 
ErrorProto().setReason("row-too-large")));
             // We verify whether the retryPolicy parameter expects us to 
retry. If it does, then
             // it will return true. Otherwise it will return false.
-            Boolean isRetry = retryPolicy.shouldRetry(new 
InsertRetryPolicy.Context(error));
-            if (isRetry) {
-              throw new RuntimeException(
+            if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) 
{
+              // Obtain table schema
+              TableSchema tableSchema = null;
+              try {
+                String tableSpec = BigQueryHelpers.toTableSpec(ref);
+                if (tableSchemaCache.containsKey(tableSpec)) {
+                  tableSchema = tableSchemaCache.get(tableSpec);
+                } else {
+                  Table table = getTable(ref);
+                  if (table != null) {
+                    tableSchema =
+                        
TableRowToStorageApiProto.schemaToProtoTableSchema(table.getSchema());
+                    tableSchemaCache.put(tableSpec, tableSchema);
+                  }
+                }
+              } catch (Exception e) {
+                LOG.warn("Failed to get table schema", e);
+              }
+
+              // Validate row schema
+              String rowDetails = "";
+              if (tableSchema != null) {
+                rowDetails = validateRowSchema(row, tableSchema);
+              }
+
+              // Basic log to return
+              String bqLimitLog =
                   String.format(
-                      "We have observed a row that is %s bytes in size and 
exceeded BigQueryIO"
-                          + " limit of 9MB. While BigQuery supports request 
sizes up to 10MB,"
-                          + " BigQueryIO sets the limit at 9MB to leave room 
for request"
-                          + " overhead. You may change your retry strategy to 
unblock this"
-                          + " pipeline, and the row will be output as a failed 
insert.",
-                      nextRowSize));
+                      "We have observed a row of size %s bytes exceeding the "
+                          + "BigQueryIO limit of %s.",
+                      nextRowSize, MAX_BQ_ROW_PAYLOAD_DESC);
+
+              // Add on row schema diff details if present
+              if (!rowDetails.isEmpty()) {
+                bqLimitLog +=
+                    String.format(
+                        " This is probably due to a schema "
+                            + "mismatch. Problematic row had extra schema 
fields: %s.",
+                        rowDetails);
+              }
+              throw new RuntimeException(bqLimitLog);
             } else {
               numFailedRows += 1;
               errorContainer.add(failedInserts, error, ref, 
rowsToPublish.get(rowIndex));
@@ -1177,7 +1216,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
           // If adding the next row will push the request above BQ row limits, 
or
           // if the current batch of elements is larger than the targeted 
request size,
           // we immediately go and issue the data insertion.
-          if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD
+          if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES
               || dataSize >= maxRowBatchSize
               || rows.size() + 1 > maxRowsPerBatch) {
             // If the row does not fit into the insert buffer, then we take 
the current buffer,
@@ -1317,6 +1356,48 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
       }
     }
 
+    /**
+     * Validates a {@link TableRow} for logging, comparing the provided row 
against a BigQuery
+     * schema. The formatted string shows the field names in the row 
indicating any mismatches
+     * unknown entries.
+     *
+     * <p>For example, a {@link TableRow} with a "names" field, where the 
schema expects
+     * "name" would return "names".
+     *
+     * <pre>{@code {'name': java.lang.String}</pre>
+     *
+     * <p>If a field exists in the row but not in the schema,
+     * "Unknown fields" is prefixed to the log.</p>
+     *
+     * @param row The {@link TableRow} to validate.
+     * @param tableSchema The {@link TableSchema} to check against.
+     * @return A string representation of the row, indicating any schema 
mismatches.
+     */
+    private String validateRowSchema(TableRow row, TableSchema tableSchema) {
+      // Creates bqSchemaFields containing field names
+      Set<String> bqSchemaFields =
+          tableSchema.getFieldsList().stream().map(f -> 
f.getName()).collect(Collectors.toSet());
+
+      // Validate
+      String rowDetails =
+          row.keySet().stream()
+              .map(
+                  fieldName -> {
+                    if (!bqSchemaFields.contains(fieldName)) {
+                      return fieldName;
+                    }
+                    return "";
+                  })
+              .filter(s -> !s.isEmpty())
+              .collect(Collectors.joining(", ", "{Unknown fields: ", "}"));
+
+      // Shorten row details if too long for human readability
+      if (rowDetails.length() > 1024) {
+        rowDetails = rowDetails.substring(0, 1024) + "...}";
+      }
+      return rowDetails;
+    }
+
     @Override
     public <T> long insertAll(
         TableReference ref,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 01454a50b25..3902fb1fca3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -1119,6 +1119,11 @@ public class BigQueryServicesImplTest {
   public void testInsertWithinRequestByteSizeLimitsErrorsOut() throws 
Exception {
     TableReference ref =
         new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("tablersl");
+    TableSchema schema =
+        new TableSchema()
+            .setFields(ImmutableList.of(new 
TableFieldSchema().setName("rows").setType("STRING")));
+    Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
     List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(
             wrapValue(new TableRow().set("row", Strings.repeat("abcdefghi", 
1024 * 1025))),
@@ -1133,11 +1138,81 @@ public class BigQueryServicesImplTest {
           when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
           when(response.getStatusCode()).thenReturn(200);
           when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+          when(response.getContent()).thenReturn(toStream(testTable));
         },
         response -> {
           when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
           when(response.getStatusCode()).thenReturn(200);
           when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+          when(response.getContent()).thenReturn(toStream(testTable));
+        });
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(
+            bigquery, 
PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create());
+    List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
+    List<ValueInSingleWindow<TableRow>> successfulRows = Lists.newArrayList();
+    RuntimeException e =
+        assertThrows(
+            RuntimeException.class,
+            () ->
+                dataService.<TableRow>insertAll(
+                    ref,
+                    rows,
+                    insertIds,
+                    BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+                    TEST_BACKOFF,
+                    new MockSleeper(),
+                    InsertRetryPolicy.alwaysRetry(),
+                    failedInserts,
+                    ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+                    false,
+                    false,
+                    false,
+                    successfulRows));
+
+    assertThat(e.getMessage(), containsString("exceeding the BigQueryIO 
limit"));
+  }
+
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of 
rows per request and
+   * schema difference check.
+   */
+  @SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ 
API only
+  @Test
+  public void testInsertWithinRequestByteSizeLimitsWithBadSchemaErrorsOut() 
throws Exception {
+    TableReference ref =
+        new 
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("tablersl");
+
+    TableSchema schema =
+        new TableSchema()
+            .setFields(ImmutableList.of(new 
TableFieldSchema().setName("row").setType("STRING")));
+    Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+        ImmutableList.of(
+            wrapValue(
+                new TableRow()
+                    .set("row", Strings.repeat("abcdefghi", 1024 * 1025))
+                    .set("badField", "goodValue")),
+            wrapValue(new TableRow().set("row", "a")),
+            wrapValue(new TableRow().set("row", "b")));
+    List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+    final TableDataInsertAllResponse allRowsSucceeded = new 
TableDataInsertAllResponse();
+
+    setupMockResponses(
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+          when(response.getContent()).thenReturn(toStream(testTable));
+        },
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+          when(response.getContent()).thenReturn(toStream(testTable));
         });
 
     DatasetServiceImpl dataService =
@@ -1164,7 +1239,10 @@ public class BigQueryServicesImplTest {
                     false,
                     successfulRows));
 
-    assertThat(e.getMessage(), containsString("exceeded BigQueryIO limit of 
9MB."));
+    assertThat(e.getMessage(), containsString("exceeding the BigQueryIO 
limit"));
+    assertThat(
+        e.getMessage(),
+        containsString("Problematic row had extra schema fields: {Unknown 
fields: badField}"));
   }
 
   @SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ 
API only

Reply via email to