This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e2431c0e55 Update BQIO to a single scheduled executor service reduce
threads (#23234)
8e2431c0e55 is described below
commit 8e2431c0e55237af4bd00a9786e4c150e20d4e14
Author: johnjcasey <[email protected]>
AuthorDate: Fri Oct 14 20:31:55 2022 -0400
Update BQIO to a single scheduled executor service reduce threads (#23234)
* Update to a single scheduled executor service to mitigate thread
propogation
* run spotless
* run spotless
* Provide separate scheduled executor
* try scheduled thread pool again
* Attempt to mimic Bigquery default scheduled thread pool
* run spotless
* update docs
* Refactor Scheduled executor service to its own options file
* clean up comments, don't accidentally create two scheduled executor
services
* clean up comments
* run spotless
* Configure scheduled executor to spin up and down core threads to mimic a
dynamic thread pool
* Run spotless
* add . for checkstyle
* Update ExecutorOptions to use @lukecwik's unbounded scheduled executor
* revert change to runners/google-cloud-dataflow-java/build.gradle
* Update comments on options files
* Update
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Co-authored-by: Lukasz Cwik <[email protected]>
---
.../apache/beam/sdk/options/ExecutorOptions.java | 59 ++++++++++++++++++++++
.../sdk/extensions/gcp/options/GcsOptions.java | 29 +++++------
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 27 ++++++++++
3 files changed, 98 insertions(+), 17 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
new file mode 100644
index 00000000000..2037d217422
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
+
+/**
+ * Options for configuring the {@link ScheduledExecutorService} used
throughout the Java runtime.
+ */
+public interface ExecutorOptions extends PipelineOptions {
+
+ /**
+ * The {@link ScheduledExecutorService} instance to use to create threads,
can be overridden to
+ * specify a {@link ScheduledExecutorService} that is compatible with the
user's environment. If
+ * unset, the default is to create an {@link
UnboundedScheduledExecutorService}.
+ */
+ @JsonIgnore
+ @Description(
+ "The ScheduledExecutorService instance to use to create threads, can be
overridden to specify "
+ + "a ScheduledExecutorService that is compatible with the user's
environment. If unset, "
+ + "the default is to create an UnboundedScheduledExecutorService.")
+ @Default.InstanceFactory(ScheduledExecutorServiceFactory.class)
+ @Hidden
+ ScheduledExecutorService getScheduledExecutorService();
+
+ void setScheduledExecutorService(ScheduledExecutorService value);
+
+ /** Returns the default {@link ScheduledExecutorService} to use within the
Apache Beam SDK. */
+ class ScheduledExecutorServiceFactory implements
DefaultValueFactory<ScheduledExecutorService> {
+ @Override
+ public ScheduledExecutorService create(PipelineOptions options) {
+ /* The SDK requires an unbounded thread pool because a step may create X
writers
+ * each requiring their own thread to perform the writes otherwise a
writer may
+ * block causing deadlock for the step because the writers buffer is
full.
+ * Also, the MapTaskExecutor launches the steps in reverse order and
completes
+ * them in forward order thus requiring enough threads so that each
step's writers
+ * can be active.
+ */
+ return new UnboundedScheduledExecutorService();
+ }
+ }
+}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 0b14b244da5..fea7be7f5c7 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Options used to configure Google Cloud Storage. */
@@ -48,20 +48,22 @@ public interface GcsOptions extends ApplicationNameOptions,
GcpOptions, Pipeline
/**
* The ExecutorService instance to use to create threads, can be overridden
to specify an
- * ExecutorService that is compatible with the user's environment. If unset,
the default is to
- * create an ExecutorService with an unbounded number of threads; this is
compatible with Google
- * AppEngine.
+ * ExecutorService that is compatible with the user's environment. If unset,
the default is to use
+ * {@link ExecutorOptions#getScheduledExecutorService()}.
+ *
+ * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()}
instead
*/
@JsonIgnore
- @Description(
- "The ExecutorService instance to use to create multiple threads. Can be
overridden "
- + "to specify an ExecutorService that is compatible with the user's
environment. If unset, "
- + "the default is to create an ExecutorService with an unbounded
number of threads; this "
- + "is compatible with Google AppEngine.")
@Default.InstanceFactory(ExecutorServiceFactory.class)
@Hidden
+ @Deprecated
ExecutorService getExecutorService();
+ /**
+ * @deprecated use {@link ExecutorOptions#setScheduledExecutorService}
instead. If set, it may
+ * result in multiple ExecutorServices, and therefore thread pools, in
the runtime.
+ */
+ @Deprecated
void setExecutorService(ExecutorService value);
/** GCS endpoint to use. If unspecified, uses the default endpoint. */
@@ -132,14 +134,7 @@ public interface GcsOptions extends
ApplicationNameOptions, GcpOptions, Pipeline
class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService>
{
@Override
public ExecutorService create(PipelineOptions options) {
- /* The SDK requires an unbounded thread pool because a step may create X
writers
- * each requiring their own thread to perform the writes otherwise a
writer may
- * block causing deadlock for the step because the writers buffer is
full.
- * Also, the MapTaskExecutor launches the steps in reverse order and
completes
- * them in forward order thus requiring enough threads so that each
step's writers
- * can be active.
- */
- return new UnboundedScheduledExecutorService();
+ return options.as(ExecutorOptions.class).getScheduledExecutorService();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 9624f3ddb2a..284fb80d70d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,6 +29,7 @@ import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
+import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
@@ -105,6 +106,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -120,6 +122,7 @@ import
org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -1483,12 +1486,36 @@ class BigQueryServicesImpl implements BigQueryServices {
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(() ->
options.as(GcpOptions.class).getGcpCredential())
+ .setBackgroundExecutorProvider(new
OptionsExecutionProvider(options))
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ /**
+ * OptionsExecutionProvider is a utility class used to wrap the
Pipeline-wide {@link
+ * ScheduledExecutorService} into a supplier for the {@link
BigQueryWriteClient}.
+ */
+ private static class OptionsExecutionProvider implements ExecutorProvider {
+
+ private final BigQueryOptions options;
+
+ public OptionsExecutionProvider(BigQueryOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public boolean shouldAutoClose() {
+ return false;
+ }
+
+ @Override
+ public ScheduledExecutorService getExecutor() {
+ return options.as(ExecutorOptions.class).getScheduledExecutorService();
+ }
+ }
+
public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL: