This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3838528fde3 [#21368] Clean-up and use the FixedExecutorProvider
(#24952)
3838528fde3 is described below
commit 3838528fde37262ecadf642791e3e0f3a57d7b25
Author: Luke Cwik <[email protected]>
AuthorDate: Mon Jan 9 15:29:46 2023 -0800
[#21368] Clean-up and use the FixedExecutorProvider (#24952)
This is a minor clean-up for https://github.com/apache/beam/pull/24950 to
re-use existing implementation from gax-java
---
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 30 +++-------------------
1 file changed, 4 insertions(+), 26 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index f56239aedd6..289087c1d46 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,8 +29,8 @@ import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
-import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
@@ -106,7 +106,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -1494,36 +1493,15 @@ class BigQueryServicesImpl implements BigQueryServices {
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(() ->
options.as(GcpOptions.class).getGcpCredential())
- .setBackgroundExecutorProvider(new
OptionsExecutionProvider(options))
+ .setBackgroundExecutorProvider(
+ FixedExecutorProvider.create(
+
options.as(ExecutorOptions.class).getScheduledExecutorService()))
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- /**
- * OptionsExecutionProvider is a utility class used to wrap the
Pipeline-wide {@link
- * ScheduledExecutorService} into a supplier for the {@link
BigQueryWriteClient}.
- */
- private static class OptionsExecutionProvider implements ExecutorProvider {
-
- private final BigQueryOptions options;
-
- public OptionsExecutionProvider(BigQueryOptions options) {
- this.options = options;
- }
-
- @Override
- public boolean shouldAutoClose() {
- return false;
- }
-
- @Override
- public ScheduledExecutorService getExecutor() {
- return options.as(ExecutorOptions.class).getScheduledExecutorService();
- }
- }
-
public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL: