[
https://issues.apache.org/jira/browse/BEAM-6183?focusedWorklogId=174313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174313
]
ASF GitHub Bot logged work on BEAM-6183:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Dec/18 22:10
Start Date: 11/Dec/18 22:10
Worklog Time Spent: 10m
Work Description: ihji closed pull request #7212: [BEAM-6183] BigQuery
insertAll API request rate is not properly controlled
URL: https://github.com/apache/beam/pull/7212
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 85df41adc63f..70ad5da92fba 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.codahale.metrics.Meter;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -28,11 +29,15 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** An interface for real, mock, or fake implementations of Cloud BigQuery
services. */
public interface BigQueryServices extends Serializable {
@@ -142,6 +147,7 @@ void createDataset(
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
+ RateController rateController,
boolean skipInvalidRows,
boolean ignoreUnknownValues)
throws IOException, InterruptedException;
@@ -150,4 +156,55 @@ void createDataset(
Table patchTableDescription(TableReference tableReference, @Nullable
String tableDescription)
throws IOException, InterruptedException;
}
+
+ /**
+ * A class for controlling insertAll submission rate.
+ *
+ * <p>To avoid excessive rate limit error messages from BigQuery API, this
class limits the number
+ * of rows per second that each worker can submit to BigQuery insertAll API.
The threshold is
+ * dynamically changing every minute based on how many rate limit errors the
worker received for
+ * the previous one minute interval. The threshold will be increased by one
percent if there was
+ * no such error and decreased by five percent if existed.
+ */
+ class RateController {
+ private static final Logger LOG =
LoggerFactory.getLogger(RateController.class);
+
+ private final Meter rateMeter;
+ private final RateLimiter rateLimiter;
+ private Instant lastAdjusted;
+
+ private static final long UPDATE_INTERVAL_MILLIS = 60000;
+ private static final double RATE_INCREASE_RATIO = 1.01;
+ private static final double RATE_DECREASE_RATIO = 0.95;
+ private static final double RATE_MAX = 100000.0;
+
+ RateController() {
+ rateMeter = new Meter();
+ rateLimiter =
+ RateLimiter.create(RATE_MAX / 50); // max rate divided by the
default number of shards.
+ lastAdjusted = Instant.now();
+ }
+
+ void mark() {
+ rateMeter.mark();
+ }
+
+ double acquire(int permits) {
+ return rateLimiter.acquire(permits);
+ }
+
+ void adjust() {
+ if (Instant.now().getMillis() - lastAdjusted.getMillis() >
UPDATE_INTERVAL_MILLIS) {
+ double rate = rateLimiter.getRate();
+ if (rateMeter.getOneMinuteRate() > 0.01) {
+ rateLimiter.setRate(Math.max(Math.floor(rate * RATE_DECREASE_RATIO),
1.0));
+ } else {
+ rateLimiter.setRate(Math.min(Math.ceil(rate * RATE_INCREASE_RATIO),
RATE_MAX));
+ }
+ LOG.info(
+ String.format("rate limit adjusted: %.1f -> %.1f rows", rate,
rateLimiter.getRate()));
+ lastAdjusted = Instant.now();
+ }
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index cbdb44f8ade3..815f1c398128 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -55,11 +55,11 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOffAdapter;
@@ -672,12 +672,13 @@ public void deleteDataset(String projectId, String
datasetId)
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
+ RateController rateController,
boolean skipInvalidRows,
boolean ignoreUnkownValues)
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
- this.executor = options.as(GcsOptions.class).getExecutorService();
+ this.executor = Executors.newSingleThreadExecutor();
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError(
@@ -734,19 +735,29 @@ public void deleteDataset(String projectId, String
datasetId)
BackOffAdapter.toGcpBackOff(RATE_LIMIT_BACKOFF_FACTORY.backoff());
while (true) {
try {
+ if (rateController != null) {
+ rateController.acquire(content.getRows().size());
+ }
return insert.execute().getInsertErrors();
} catch (IOException e) {
- if (new ApiErrorExtractor().rateLimited(e)) {
+ if (ApiErrorExtractor.INSTANCE.rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit,
retrying");
- try {
- sleeper.sleep(backoff1.nextBackOffMillis());
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying
insertAll");
- }
+ } else if (ApiErrorExtractor.INSTANCE
+ .getErrorMessage(e)
+ .startsWith("Quota exceeded")) {
+ LOG.info("BigQuery insertAll quota exceeded,
retrying");
} else {
throw e;
}
+ if (rateController != null) {
+ rateController.mark();
+ }
+ try {
+ sleeper.sleep(backoff1.nextBackOffMillis());
+ } catch (InterruptedException interrupted) {
+ throw new IOException(
+ "Interrupted while waiting before retrying
insertAll");
+ }
}
}
}));
@@ -823,6 +834,7 @@ public void deleteDataset(String projectId, String
datasetId)
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
+ RateController rateController,
boolean skipInvalidRows,
boolean ignoreUnknownValues)
throws IOException, InterruptedException {
@@ -835,6 +847,7 @@ public void deleteDataset(String projectId, String
datasetId)
retryPolicy,
failedInserts,
errorContainer,
+ rateController,
skipInvalidRows,
ignoreUnknownValues);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index c87873bf2f58..63171b23b71a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -45,6 +45,7 @@
private final ErrorContainer<ErrorT> errorContainer;
private final boolean skipInvalidRows;
private final boolean ignoreUnknownValues;
+ private BigQueryServices.RateController rateController;
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
@@ -75,6 +76,9 @@
public void startBundle() {
tableRows = new HashMap<>();
uniqueIdsForTableRows = new HashMap<>();
+ if (rateController == null) {
+ rateController = new BigQueryServices.RateController();
+ }
}
/** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
@@ -108,6 +112,7 @@ public void finishBundle(FinishBundleContext context)
throws Exception {
}
tableRows.clear();
uniqueIdsForTableRows.clear();
+ rateController.adjust();
for (ValueInSingleWindow<ErrorT> row : failedInserts) {
context.output(failedOutputTag, row.getValue(), row.getTimestamp(),
row.getWindow());
@@ -134,6 +139,7 @@ private void flushRows(
retryPolicy,
failedInserts,
errorContainer,
+ rateController,
skipInvalidRows,
ignoreUnknownValues);
byteCounter.inc(totalBytes);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 9f20e5087e92..868ebc9543b7 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -516,6 +516,7 @@ public void testInsertRetry() throws Exception {
InsertRetryPolicy.alwaysRetry(),
null,
null,
+ null,
false,
false);
verify(response, times(2)).getStatusCode();
@@ -565,6 +566,7 @@ public void testInsertRetrySelectRows() throws Exception {
InsertRetryPolicy.alwaysRetry(),
null,
null,
+ null,
false,
false);
verify(response, times(2)).getStatusCode();
@@ -610,6 +612,7 @@ public void testInsertFailsGracefully() throws Exception {
InsertRetryPolicy.alwaysRetry(),
null,
null,
+ null,
false,
false);
fail();
@@ -658,6 +661,7 @@ public void testInsertDoesNotRetry() throws Throwable {
InsertRetryPolicy.alwaysRetry(),
null,
null,
+ null,
false,
false);
fail();
@@ -729,6 +733,7 @@ public void testInsertRetryPolicy() throws
InterruptedException, IOException {
InsertRetryPolicy.retryTransientErrors(),
failedInserts,
ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ null,
false,
false);
assertEquals(1, failedInserts.size());
@@ -769,6 +774,7 @@ public void
testSkipInvalidRowsIgnoreUnknownValuesStreaming()
InsertRetryPolicy.neverRetry(),
Lists.newArrayList(),
ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ null,
false,
false);
@@ -788,6 +794,7 @@ public void
testSkipInvalidRowsIgnoreUnknownValuesStreaming()
InsertRetryPolicy.neverRetry(),
Lists.newArrayList(),
ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ null,
true,
true);
@@ -970,6 +977,7 @@ public void testSimpleErrorRetrieval() throws
InterruptedException, IOException
InsertRetryPolicy.neverRetry(),
failedInserts,
ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ null,
false,
false);
@@ -1024,6 +1032,7 @@ public void testExtendedErrorRetrieval() throws
InterruptedException, IOExceptio
InsertRetryPolicy.neverRetry(),
failedInserts,
ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER,
+ null,
false,
false);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 12adc2e6b44d..1d5eabd17de6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -205,7 +205,7 @@ public void testInsertAll() throws Exception {
try {
totalBytes =
datasetService.insertAll(
- ref, rows, ids, InsertRetryPolicy.alwaysRetry(), null, null,
false, false);
+ ref, rows, ids, InsertRetryPolicy.alwaysRetry(), null, null,
null, false, false);
} finally {
verifyInsertAll(5);
// Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index d305450bd800..5719faeb1eb6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -204,7 +204,15 @@ public long insertAll(
PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
return insertAll(
- ref, windowedRows, insertIdList, InsertRetryPolicy.alwaysRetry(),
null, null, false, false);
+ ref,
+ windowedRows,
+ insertIdList,
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ null,
+ false,
+ false);
}
@Override
@@ -215,6 +223,7 @@ public long insertAll(
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
+ BigQueryServices.RateController rateController,
boolean skipInvalidRows,
boolean ignoreUnknownValues)
throws IOException, InterruptedException {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 174313)
Time Spent: 1h (was: 50m)
> BigQuery insertAll API request rate is not properly controlled
> --------------------------------------------------------------
>
> Key: BEAM-6183
> URL: https://issues.apache.org/jira/browse/BEAM-6183
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.8.0
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: Major
> Fix For: 2.10.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> BigQuery insertAll API request rate is not properly controlled so it produces
> too many rate limit exceeded error messages in the worker log.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)