This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ccf9522b52d [BQ]: Update error message for too big of a BQ tablerow
(#35567)
ccf9522b52d is described below
commit ccf9522b52dc6bebebfa7f10f0ccfc24b9b55ed9
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Jul 21 09:47:10 2025 -0400
[BQ]: Update error message for too big of a BQ tablerow (#35567)
* improve on existing too big of a table row message and update test
* switch to key and value pairs
* update comments and logs
* update to schema only
* add types for row details
* update to surface the schema data better
* update to validate row schema
* fix Ahmeds' comments
* clean up comments
* oops ran over previous commits
* fix spotless
---
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 105 ++++++++++++++++++---
.../io/gcp/bigquery/BigQueryServicesImplTest.java | 80 +++++++++++++++-
2 files changed, 172 insertions(+), 13 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 390ffa1aa99..f4303886c7a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -109,12 +109,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.beam.fn.harness.logging.QuotaEvent;
import org.apache.beam.fn.harness.logging.QuotaEvent.QuotaEventCloseable;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
@@ -172,7 +174,12 @@ public class BigQueryServicesImpl implements
BigQueryServices {
// The approximate maximum payload of rows for an insertAll request.
// We set it to 9MB, which leaves room for request overhead.
- private static final Integer MAX_BQ_ROW_PAYLOAD = 9 * 1024 * 1024;
+ private static final Integer MAX_BQ_ROW_PAYLOAD_MB = 9;
+
+ private static final Integer MAX_BQ_ROW_PAYLOAD_BYTES =
MAX_BQ_ROW_PAYLOAD_MB * 1024 * 1024;
+
+ private static final String MAX_BQ_ROW_PAYLOAD_DESC =
+ String.format("%sMB", MAX_BQ_ROW_PAYLOAD_MB);
// The initial backoff for polling the status of a BigQuery job.
private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF =
Duration.standardSeconds(1);
@@ -596,6 +603,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
private final PipelineOptions options;
private final long maxRowsPerBatch;
private final long maxRowBatchSize;
+ private final Map<String, TableSchema> tableSchemaCache = new
ConcurrentHashMap<>();
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
Metrics.counter(DatasetServiceImpl.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
@@ -1149,22 +1157,53 @@ public class BigQueryServicesImpl implements
BigQueryServices {
// If this row's encoding by itself is larger than the maximum row
payload, then it's
// impossible to insert into BigQuery, and so we send it out through
the dead-letter
// queue.
- if (nextRowSize >= MAX_BQ_ROW_PAYLOAD) {
+ if (nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES) {
InsertErrors error =
new InsertErrors()
.setErrors(ImmutableList.of(new
ErrorProto().setReason("row-too-large")));
// We verify whether the retryPolicy parameter expects us to
retry. If it does, then
// it will return true. Otherwise it will return false.
- Boolean isRetry = retryPolicy.shouldRetry(new
InsertRetryPolicy.Context(error));
- if (isRetry) {
- throw new RuntimeException(
+ if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error)))
{
+ // Obtain table schema
+ TableSchema tableSchema = null;
+ try {
+ String tableSpec = BigQueryHelpers.toTableSpec(ref);
+ if (tableSchemaCache.containsKey(tableSpec)) {
+ tableSchema = tableSchemaCache.get(tableSpec);
+ } else {
+ Table table = getTable(ref);
+ if (table != null) {
+ tableSchema =
+
TableRowToStorageApiProto.schemaToProtoTableSchema(table.getSchema());
+ tableSchemaCache.put(tableSpec, tableSchema);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get table schema", e);
+ }
+
+ // Validate row schema
+ String rowDetails = "";
+ if (tableSchema != null) {
+ rowDetails = validateRowSchema(row, tableSchema);
+ }
+
+ // Basic log to return
+ String bqLimitLog =
String.format(
- "We have observed a row that is %s bytes in size and
exceeded BigQueryIO"
- + " limit of 9MB. While BigQuery supports request
sizes up to 10MB,"
- + " BigQueryIO sets the limit at 9MB to leave room
for request"
- + " overhead. You may change your retry strategy to
unblock this"
- + " pipeline, and the row will be output as a failed
insert.",
- nextRowSize));
+ "We have observed a row of size %s bytes exceeding the "
+ + "BigQueryIO limit of %s.",
+ nextRowSize, MAX_BQ_ROW_PAYLOAD_DESC);
+
+ // Add on row schema diff details if present
+ if (!rowDetails.isEmpty()) {
+ bqLimitLog +=
+ String.format(
+ " This is probably due to a schema "
+ + "mismatch. Problematic row had extra schema
fields: %s.",
+ rowDetails);
+ }
+ throw new RuntimeException(bqLimitLog);
} else {
numFailedRows += 1;
errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(rowIndex));
@@ -1177,7 +1216,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
// If adding the next row will push the request above BQ row limits,
or
// if the current batch of elements is larger than the targeted
request size,
// we immediately go and issue the data insertion.
- if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD
+ if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES
|| dataSize >= maxRowBatchSize
|| rows.size() + 1 > maxRowsPerBatch) {
// If the row does not fit into the insert buffer, then we take
the current buffer,
@@ -1317,6 +1356,48 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
}
+ /**
+ * Validates a {@link TableRow} for logging, comparing the provided row
against a BigQuery
+ * schema. The formatted string shows the field names in the row
indicating any mismatches
+ * unknown entries.
+ *
+ * <p>For example, a {@link TableRow} with a "names" field, where the
schema expects
+ * "name" would return "names".
+ *
+ * <pre>{@code {'name': java.lang.String}</pre>
+ *
+ * <p>If a field exists in the row but not in the schema,
+ * "Unknown fields" is prefixed to the log.</p>
+ *
+ * @param row The {@link TableRow} to validate.
+ * @param tableSchema The {@link TableSchema} to check against.
+ * @return A string representation of the row, indicating any schema
mismatches.
+ */
+ private String validateRowSchema(TableRow row, TableSchema tableSchema) {
+ // Creates bqSchemaFields containing field names
+ Set<String> bqSchemaFields =
+ tableSchema.getFieldsList().stream().map(f ->
f.getName()).collect(Collectors.toSet());
+
+ // Validate
+ String rowDetails =
+ row.keySet().stream()
+ .map(
+ fieldName -> {
+ if (!bqSchemaFields.contains(fieldName)) {
+ return fieldName;
+ }
+ return "";
+ })
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.joining(", ", "{Unknown fields: ", "}"));
+
+ // Shorten row details if too long for human readability
+ if (rowDetails.length() > 1024) {
+ rowDetails = rowDetails.substring(0, 1024) + "...}";
+ }
+ return rowDetails;
+ }
+
@Override
public <T> long insertAll(
TableReference ref,
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 01454a50b25..3902fb1fca3 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -1119,6 +1119,11 @@ public class BigQueryServicesImplTest {
public void testInsertWithinRequestByteSizeLimitsErrorsOut() throws
Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("tablersl");
+ TableSchema schema =
+ new TableSchema()
+ .setFields(ImmutableList.of(new
TableFieldSchema().setName("rows").setType("STRING")));
+ Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
ImmutableList.of(
wrapValue(new TableRow().set("row", Strings.repeat("abcdefghi",
1024 * 1025))),
@@ -1133,11 +1138,81 @@ public class BigQueryServicesImplTest {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ when(response.getContent()).thenReturn(toStream(testTable));
},
response -> {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ when(response.getContent()).thenReturn(toStream(testTable));
+ });
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(
+ bigquery,
PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create());
+ List<ValueInSingleWindow<TableRow>> failedInserts = Lists.newArrayList();
+ List<ValueInSingleWindow<TableRow>> successfulRows = Lists.newArrayList();
+ RuntimeException e =
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ dataService.<TableRow>insertAll(
+ ref,
+ rows,
+ insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ failedInserts,
+ ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ false,
+ false,
+ false,
+ successfulRows));
+
+ assertThat(e.getMessage(), containsString("exceeding the BigQueryIO
limit"));
+ }
+
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of
rows per request and
+ * schema difference check.
+ */
+ @SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+
API only
+ @Test
+ public void testInsertWithinRequestByteSizeLimitsWithBadSchemaErrorsOut()
throws Exception {
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("tablersl");
+
+ TableSchema schema =
+ new TableSchema()
+ .setFields(ImmutableList.of(new
TableFieldSchema().setName("row").setType("STRING")));
+ Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(
+ new TableRow()
+ .set("row", Strings.repeat("abcdefghi", 1024 * 1025))
+ .set("badField", "goodValue")),
+ wrapValue(new TableRow().set("row", "a")),
+ wrapValue(new TableRow().set("row", "b")));
+ List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+ final TableDataInsertAllResponse allRowsSucceeded = new
TableDataInsertAllResponse();
+
+ setupMockResponses(
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ when(response.getContent()).thenReturn(toStream(testTable));
+ },
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ when(response.getContent()).thenReturn(toStream(testTable));
});
DatasetServiceImpl dataService =
@@ -1164,7 +1239,10 @@ public class BigQueryServicesImplTest {
false,
successfulRows));
- assertThat(e.getMessage(), containsString("exceeded BigQueryIO limit of
9MB."));
+ assertThat(e.getMessage(), containsString("exceeding the BigQueryIO
limit"));
+ assertThat(
+ e.getMessage(),
+ containsString("Problematic row had extra schema fields: {Unknown
fields: badField}"));
}
@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+
API only