pabloem commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r734200008
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +963,72 @@ public void deleteDataset(String projectId, String
datasetId)
// Store the longest throttled time across all parallel threads
final AtomicLong maxThrottlingMsec = new AtomicLong();
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i).getValue();
- TableDataInsertAllRequest.Rows out = new
TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
+ int rowIndex = 0;
+ while (rowIndex < rowsToPublish.size()) {
+ TableRow row = rowsToPublish.get(rowIndex).getValue();
+ long nextRowSize = 0L;
try {
- dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+ nextRowSize =
TableRowJsonCoder.of().getEncodedElementByteSize(row);
} catch (Exception ex) {
throw new RuntimeException("Failed to convert the row to JSON",
ex);
}
- if (dataSize >= maxRowBatchSize
- || rows.size() >= maxRowsPerBatch
- || i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new
TableDataInsertAllRequest();
- content.setRows(rows);
- content.setSkipInvalidRows(skipInvalidRows);
- content.setIgnoreUnknownValues(ignoreUnkownValues);
-
- final Bigquery.Tabledata.InsertAll insert =
- client
- .tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(),
ref.getTableId(), content)
- .setPrettyPrint(false);
-
- // Create final reference (which cannot change).
- // So the lamba expression can refer to rowsInsertedForRequest to
use on error.
- futures.add(
- executor.submit(
- () -> {
- // A backoff for rate limit exceeded errors.
- BackOff backoff1 =
-
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
- long totalBackoffMillis = 0L;
- while (true) {
- ServiceCallMetric serviceCallMetric =
BigQueryUtils.writeCallMetric(ref);
- try {
- List<TableDataInsertAllResponse.InsertErrors>
response =
- insert.execute().getInsertErrors();
- if (response == null || response.isEmpty()) {
- serviceCallMetric.call("ok");
- } else {
- for (TableDataInsertAllResponse.InsertErrors
insertErrors : response) {
- for (ErrorProto insertError :
insertErrors.getErrors()) {
-
serviceCallMetric.call(insertError.getReason());
- }
- }
- }
- return response;
- } catch (IOException e) {
- GoogleJsonError.ErrorInfo errorInfo =
getErrorInfo(e);
- if (errorInfo == null) {
-
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
- throw e;
- }
- serviceCallMetric.call(errorInfo.getReason());
- /**
- * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error
will be replaced by
- * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after
the next release of
- * GoogleCloudDataproc/hadoop-connectors
- */
- if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
- &&
!errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
- throw e;
- }
- LOG.info(
- String.format(
- "BigQuery insertAll error, retrying: %s",
-
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
- try {
- long nextBackOffMillis =
backoff1.nextBackOffMillis();
- if (nextBackOffMillis == BackOff.STOP) {
- throw e;
- }
- sleeper.sleep(nextBackOffMillis);
- totalBackoffMillis += nextBackOffMillis;
- final long totalBackoffMillisSoFar =
totalBackoffMillis;
- maxThrottlingMsec.getAndUpdate(
- current -> Math.max(current,
totalBackoffMillisSoFar));
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying
insertAll");
- }
- }
- }
- }));
- strideIndices.add(strideIndex);
-
- retTotalDataSize += dataSize;
-
- dataSize = 0L;
- strideIndex = i + 1;
- rows = new ArrayList<>();
+ // The following scenario must be *extremely* rare.
+ // If this row's encoding by itself is larger than the
maxRowBatchSize, then it's
+ // impossible to insert into BigQuery, and so we send it out through
the dead-letter
+ // queue.
+ if (nextRowSize >= maxRowBatchSize) {
+ errorContainer.add(
+ failedInserts,
+ new InsertErrors()
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("row too large"))),
+ ref,
+ rowsToPublish.get(rowIndex));
+ rowIndex++;
+ if (rows.size() == 0) {
Review comment:
you're right. I've simplified the logic to match your pseudocode
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]