dhercher commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r733196365
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +866,130 @@ public void deleteDataset(String projectId, String
datasetId)
// Store the longest throttled time across all parallel threads
final AtomicLong maxThrottlingMsec = new AtomicLong();
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i).getValue();
- TableDataInsertAllRequest.Rows out = new
TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
+ int rowIndex = 0;
+ while (rowIndex < rowsToPublish.size()) {
+ TableRow row = rowsToPublish.get(rowIndex).getValue();
+ long nextRowSize = 0L;
try {
- dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+ nextRowSize =
TableRowJsonCoder.of().getEncodedElementByteSize(row);
} catch (Exception ex) {
throw new RuntimeException("Failed to convert the row to JSON",
ex);
}
- if (dataSize >= maxRowBatchSize
- || rows.size() >= maxRowsPerBatch
- || i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new
TableDataInsertAllRequest();
- content.setRows(rows);
- content.setSkipInvalidRows(skipInvalidRows);
- content.setIgnoreUnknownValues(ignoreUnkownValues);
-
- final Bigquery.Tabledata.InsertAll insert =
- client
- .tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(),
ref.getTableId(), content)
- .setPrettyPrint(false);
-
- // Create final reference (which cannot change).
- // So the lamba expression can refer to rowsInsertedForRequest to
use on error.
- futures.add(
- executor.submit(
- () -> {
- // A backoff for rate limit exceeded errors.
- BackOff backoff1 =
-
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
- long totalBackoffMillis = 0L;
- while (true) {
- ServiceCallMetric serviceCallMetric =
BigQueryUtils.writeCallMetric(ref);
- try {
- List<TableDataInsertAllResponse.InsertErrors>
response =
- insert.execute().getInsertErrors();
- if (response == null || response.isEmpty()) {
- serviceCallMetric.call("ok");
- } else {
- for (TableDataInsertAllResponse.InsertErrors
insertErrors : response) {
- for (ErrorProto insertError :
insertErrors.getErrors()) {
-
serviceCallMetric.call(insertError.getReason());
- }
+ // The following scenario must be *extremely* rare.
+ // If this row's encoding by itself is larger than the
maxRowBatchSize, then it's
+ // impossible to insert into BigQuery, and so we send it out through
the dead-letter
+ // queue.
+ if (nextRowSize >= maxRowBatchSize) {
+ errorContainer.add(
+ failedInserts,
+ new InsertErrors()
+ .setErrors(ImmutableList.of(new
ErrorProto().setReason("row too large"))),
+ ref,
+ rowsToPublish.get(rowIndex));
+ rowIndex++;
+ continue;
+ }
+ // If the row fits into the insert buffer, then we add it to the
buffer to be inserted
+ // later, and we move onto the next row.
+ if (nextRowSize + dataSize < maxRowBatchSize && rows.size() <=
maxRowsPerBatch) {
+ TableDataInsertAllRequest.Rows out = new
TableDataInsertAllRequest.Rows();
+ if (idsToPublish != null) {
+ out.setInsertId(idsToPublish.get(rowIndex));
+ }
+ out.setJson(row.getUnknownKeys());
+ rows.add(out);
+ rowIndex++;
+ dataSize += nextRowSize;
+ // If we have not reached the last row, then we continue the loop
to insert the rows
+ // that follow.
+ // If we have reached the last row, then we do not continue to the
next loop iteration,
+ // and instead we allow us to continue down to attempt to insert
it.
+ if (rowIndex <= rowsToPublish.size() - 1) {
+ continue;
+ }
+ }
+ // If the row does not fit into the insert buffer, then we take the
current buffer,
+ // issue the insert call, and we retry adding the same row to the
troublesome buffer.
+ TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+ content.setRows(rows);
+ content.setSkipInvalidRows(skipInvalidRows);
+ content.setIgnoreUnknownValues(ignoreUnkownValues);
+
+ final Bigquery.Tabledata.InsertAll insert =
+ client
+ .tabledata()
+ .insertAll(ref.getProjectId(), ref.getDatasetId(),
ref.getTableId(), content)
+ .setPrettyPrint(false);
+
+ // Create final reference (which cannot change).
+ // So the lamba expression can refer to rowsInsertedForRequest to
use on error.
+ futures.add(
+ executor.submit(
+ () -> {
+ // A backoff for rate limit exceeded errors.
+ BackOff backoff1 =
+
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
+ long totalBackoffMillis = 0L;
+ while (true) {
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.writeCallMetric(ref);
+ try {
+ List<TableDataInsertAllResponse.InsertErrors> response
=
+ insert.execute().getInsertErrors();
+ if (response == null || response.isEmpty()) {
+ serviceCallMetric.call("ok");
+ } else {
+ for (TableDataInsertAllResponse.InsertErrors
insertErrors : response) {
+ for (ErrorProto insertError :
insertErrors.getErrors()) {
+ serviceCallMetric.call(insertError.getReason());
}
}
- return response;
- } catch (IOException e) {
- GoogleJsonError.ErrorInfo errorInfo =
getErrorInfo(e);
- if (errorInfo == null) {
-
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
- throw e;
- }
- serviceCallMetric.call(errorInfo.getReason());
- /**
- * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error
will be replaced by
- * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after
the next release of
- * GoogleCloudDataproc/hadoop-connectors
- */
- if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
- &&
!errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+ }
+ return response;
+ } catch (IOException e) {
+ GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+ if (errorInfo == null) {
+
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+ throw e;
+ }
+ serviceCallMetric.call(errorInfo.getReason());
+ /**
+ * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error
will be replaced by
+ * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after
the next release of
+ * GoogleCloudDataproc/hadoop-connectors
+ */
+ if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+ && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+ throw e;
+ }
+ LOG.info(
+ String.format(
+ "BigQuery insertAll error, retrying: %s",
+
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+ try {
+ long nextBackOffMillis =
backoff1.nextBackOffMillis();
+ if (nextBackOffMillis == BackOff.STOP) {
throw e;
}
- LOG.info(
- String.format(
- "BigQuery insertAll error, retrying: %s",
-
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
- try {
- long nextBackOffMillis =
backoff1.nextBackOffMillis();
- if (nextBackOffMillis == BackOff.STOP) {
- throw e;
- }
- sleeper.sleep(nextBackOffMillis);
- totalBackoffMillis += nextBackOffMillis;
- final long totalBackoffMillisSoFar =
totalBackoffMillis;
- maxThrottlingMsec.getAndUpdate(
- current -> Math.max(current,
totalBackoffMillisSoFar));
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying
insertAll");
- }
+ sleeper.sleep(nextBackOffMillis);
+ totalBackoffMillis += nextBackOffMillis;
+ final long totalBackoffMillisSoFar =
totalBackoffMillis;
+ maxThrottlingMsec.getAndUpdate(
+ current -> Math.max(current,
totalBackoffMillisSoFar));
+ } catch (InterruptedException interrupted) {
+ throw new IOException(
+ "Interrupted while waiting before retrying
insertAll");
}
}
- }));
- strideIndices.add(strideIndex);
+ }
+ }));
+ strideIndices.add(strideIndex);
- retTotalDataSize += dataSize;
+ retTotalDataSize += dataSize;
- dataSize = 0L;
- strideIndex = i + 1;
- rows = new ArrayList<>();
- }
+ dataSize = 0L;
+ strideIndex = rowIndex + 1;
Review comment:
should strideIndex = rowIndex here since the rowIndex would have already
been incremented here if the row had been accounted for?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]