[
https://issues.apache.org/jira/browse/BEAM-5514?focusedWorklogId=173882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173882
]
ASF GitHub Bot logged work on BEAM-5514:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Dec/18 03:18
Start Date: 11/Dec/18 03:18
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #7189: [BEAM-5514]
BigQueryIO doesn't handle quotaExceeded errors properly
URL: https://github.com/apache/beam/pull/7189
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index cbdb44f8ade3..f147634f9cab 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -736,16 +736,15 @@ public void deleteDataset(String projectId, String
datasetId)
try {
return insert.execute().getInsertErrors();
} catch (IOException e) {
- if (new ApiErrorExtractor().rateLimited(e)) {
- LOG.info("BigQuery insertAll exceeded rate limit,
retrying");
- try {
- sleeper.sleep(backoff1.nextBackOffMillis());
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying
insertAll");
- }
- } else {
- throw e;
+ LOG.info(
+ String.format(
+ "BigQuery insertAll error, retrying: %s",
+
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+ try {
+ sleeper.sleep(backoff1.nextBackOffMillis());
+ } catch (InterruptedException interrupted) {
+ throw new IOException(
+ "Interrupted while waiting before retrying
insertAll");
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 9f20e5087e92..48fd5b9eea41 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -490,9 +490,9 @@ public void testExecuteWithRetries() throws IOException,
InterruptedException {
PaneInfo.ON_TIME_AND_ONLY_FIRING);
}
- /** Tests that {@link DatasetServiceImpl#insertAll} retries quota rate
limited attempts. */
+ /** Tests that {@link DatasetServiceImpl#insertAll} retries rate limited
attempts. */
@Test
- public void testInsertRetry() throws Exception {
+ public void testInsertRateLimitRetry() throws Exception {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
@@ -521,8 +521,43 @@ public void testInsertRetry() throws Exception {
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
- expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit,
retrying");
+ expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
}
+
+ /** Tests that {@link DatasetServiceImpl#insertAll} retries quota exceeded
attempts. */
+ @Test
+ public void testInsertQuotaExceededRetry() throws Exception {
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
+ rows.add(wrapValue(new TableRow()));
+
+ // First response is 403 quota exceeded, second response has valid payload.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("quotaExceeded", 403)))
+ .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ dataService.insertAll(
+ ref,
+ rows,
+ null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false);
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+ }
+
// A BackOff that makes a total of 4 attempts
private static final FluentBackoff TEST_BACKOFF =
FluentBackoff.DEFAULT
@@ -626,15 +661,18 @@ public void testInsertFailsGracefully() throws Exception {
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}
- /** Tests that {@link DatasetServiceImpl#insertAll} does not retry
non-rate-limited attempts. */
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} retries other
non-rate-limited,
+ * non-quota-exceeded attempts.
+ */
@Test
- public void testInsertDoesNotRetry() throws Throwable {
+ public void testInsertOtherRetry() throws Throwable {
TableReference ref =
new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
rows.add(wrapValue(new TableRow()));
- // First response is 403 not-rate-limited, second response has valid
payload but should not
+ // First response is 403 non-{rate-limited, quota-exceeded}, second
response has valid payload but should not
// be invoked.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
@@ -642,31 +680,23 @@ public void testInsertDoesNotRetry() throws Throwable {
.thenReturn(toStream(errorWithReasonAndStatus("actually forbidden",
403)))
.thenReturn(toStream(new TableDataInsertAllResponse()));
- thrown.expect(GoogleJsonResponseException.class);
- thrown.expectMessage("actually forbidden");
-
DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-
- try {
- dataService.insertAll(
- ref,
- rows,
- null,
- BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
- new MockSleeper(),
- InsertRetryPolicy.alwaysRetry(),
- null,
- null,
- false,
- false);
- fail();
- } catch (RuntimeException e) {
- verify(response, times(1)).getStatusCode();
- verify(response, times(1)).getContent();
- verify(response, times(1)).getContentType();
- throw e.getCause();
- }
+ dataService.insertAll(
+ ref,
+ rows,
+ null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false);
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173882)
Time Spent: 4h (was: 3h 50m)
> BigQueryIO doesn't handle quotaExceeded errors properly
> -------------------------------------------------------
>
> Key: BEAM-5514
> URL: https://issues.apache.org/jira/browse/BEAM-5514
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Reporter: Kevin Peterson
> Assignee: Heejong Lee
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
> When exceeding a streaming quota for BigQuery insertAll requests, BigQuery
> returns a 403 with reason "quotaExceeded".
> The current implementation of BigQueryIO does not consider this to be a rate
> limited exception, and therefore does not perform exponential backoff
> properly, leading to repeated calls to BQ.
> The actual error is in the
> [ApiErrorExtractor|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L739]
> class, which is called from
> [BigQueryServicesImpl|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/ApiErrorExtractor.java#L263]
> to determine how to retry the failure.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)