dhercher commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r734112128



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +963,72 @@ public void deleteDataset(String projectId, String 
datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new 
TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = 
TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", 
ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new 
TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to 
use on error.
-            futures.add(
-                executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = 
BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> 
response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors 
insertErrors : response) {
-                              for (ErrorProto insertError : 
insertErrors.getErrors()) {
-                                
serviceCallMetric.call(insertError.getReason());
-                              }
-                            }
-                          }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = 
getErrorInfo(e);
-                          if (errorInfo == null) {
-                            
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error 
will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after 
the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && 
!errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
-                            throw e;
-                          }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = 
backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = 
totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, 
totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying 
insertAll");
-                          }
-                        }
-                      }
-                    }));
-            strideIndices.add(strideIndex);
-
-            retTotalDataSize += dataSize;
-
-            dataSize = 0L;
-            strideIndex = i + 1;
-            rows = new ArrayList<>();
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the 
maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through 
the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new 
ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            if (rows.size() == 0) {

Review comment:
       Need to check that `rowIndex < rowsToPublish.size()` before continuing.  
I'm also worried about the idea of enforcing this limit (which saves an error 
API call, but strictly enforces a limit which BigQuery might change or be able 
to increase.
   
   Perhaps the flow could be:
   
   ```
   while (nextRow) {
       if (nextRow is going to break our limits) {
           publishBatch()
       }
   
       addRow()
   }
   publishBatch()
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to