This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8e2431c0e55 Update BQIO to a single scheduled executor service reduce threads (#23234) 8e2431c0e55 is described below commit 8e2431c0e55237af4bd00a9786e4c150e20d4e14 Author: johnjcasey <95318300+johnjca...@users.noreply.github.com> AuthorDate: Fri Oct 14 20:31:55 2022 -0400 Update BQIO to a single scheduled executor service reduce threads (#23234) * Update to a single scheduled executor service to mitigate thread propogation * run spotless * run spotless * Provide separate scheduled executor * try scheduled thread pool again * Attempt to mimic Bigquery default scheduled thread pool * run spotless * update docs * Refactor Scheduled executor service to its own options file * clean up comments, don't accidentally create two scheduled executor services * clean up comments * run spotless * Configure scheduled executor to spin up and down core threads to mimic a dynamic thread pool * Run spotless * add . for checkstyle * Update ExecutorOptions to use @lukecwik's unbounded scheduled executor * revert change to runners/google-cloud-dataflow-java/build.gradle * Update comments on options files * Update sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java Co-authored-by: Lukasz Cwik <lc...@google.com> --- .../apache/beam/sdk/options/ExecutorOptions.java | 59 ++++++++++++++++++++++ .../sdk/extensions/gcp/options/GcsOptions.java | 29 +++++------ .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 27 ++++++++++ 3 files changed, 98 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java new file mode 100644 index 00000000000..2037d217422 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; + +/** + * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. + */ +public interface ExecutorOptions extends PipelineOptions { + + /** + * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to + * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If + * unset, the default is to create an {@link UnboundedScheduledExecutorService}. + */ + @JsonIgnore + @Description( + "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an UnboundedScheduledExecutorService.") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Hidden + ScheduledExecutorService getScheduledExecutorService(); + + void setScheduledExecutorService(ScheduledExecutorService value); + + /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ + class ScheduledExecutorServiceFactory implements DefaultValueFactory<ScheduledExecutorService> { + @Override + public ScheduledExecutorService create(PipelineOptions options) { + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new UnboundedScheduledExecutorService(); + } + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 0b14b244da5..fea7be7f5c7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; import org.checkerframework.checker.nullness.qual.Nullable; /** Options used to configure Google Cloud Storage. */ @@ -48,20 +48,22 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the user's environment. If unset, the default is to - * create an ExecutorService with an unbounded number of threads; this is compatible with Google - * AppEngine. + * ExecutorService that is compatible with the user's environment. If unset, the default is to use + * {@link ExecutorOptions#getScheduledExecutorService()}. + * + * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead */ @JsonIgnore - @Description( - "The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") @Default.InstanceFactory(ExecutorServiceFactory.class) @Hidden + @Deprecated ExecutorService getExecutorService(); + /** + * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may + * result in multiple ExecutorServices, and therefore thread pools, in the runtime. + */ + @Deprecated void setExecutorService(ExecutorService value); /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @@ -132,14 +134,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { @Override public ExecutorService create(PipelineOptions options) { - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new UnboundedScheduledExecutorService(); + return options.as(ExecutorOptions.class).getScheduledExecutorService(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 9624f3ddb2a..284fb80d70d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -29,6 +29,7 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; +import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; @@ -105,6 +106,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -120,6 +122,7 @@ import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.FluentBackoff; @@ -1483,12 +1486,36 @@ class BigQueryServicesImpl implements BigQueryServices { return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) + .setBackgroundExecutorProvider(new OptionsExecutionProvider(options)) .build()); } catch (Exception e) { throw new RuntimeException(e); } } + /** + * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link + * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}. + */ + private static class OptionsExecutionProvider implements ExecutorProvider { + + private final BigQueryOptions options; + + public OptionsExecutionProvider(BigQueryOptions options) { + this.options = options; + } + + @Override + public boolean shouldAutoClose() { + return false; + } + + @Override + public ScheduledExecutorService getExecutor() { + return options.as(ExecutorOptions.class).getScheduledExecutorService(); + } + } + public static CustomHttpErrors createBigQueryClientCustomErrors() { CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder(); // 403 errors, to list tables, matching this URL: