This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3314fe9 [BEAM-11994] Instantiate a new ServiceCallMetric before each
request to avoid concurrent modification exception
new 75ea7ea Merge pull request #15420 from [BEAM-11994] Instantiate a new
ServiceCallMetric before each request to avoid concurrent modification exception
3314fe9 is described below
commit 3314fe9f6547537a1f69d7e5e3a6d827c3f7fcc9
Author: Alex Amato <[email protected]>
AuthorDate: Mon Aug 30 10:30:14 2021 -0700
[BEAM-11994] Instantiate a new ServiceCallMetric before each request to
avoid concurrent modification exception
---
.../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 951f1fd..afa5ae7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -843,8 +843,6 @@ class BigQueryServicesImpl implements BigQueryServices {
idsToPublish = insertIdList;
}
- ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-
while (true) {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new
ArrayList<>();
List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() :
null;
@@ -898,6 +896,7 @@ class BigQueryServicesImpl implements BigQueryServices {
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
long totalBackoffMillis = 0L;
while (true) {
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.writeCallMetric(ref);
try {
List<TableDataInsertAllResponse.InsertErrors>
response =
insert.execute().getInsertErrors();