lukecwik commented on code in PR #23234:
URL: https://github.com/apache/beam/pull/23234#discussion_r983792955
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java:
##########
@@ -157,6 +178,30 @@ public ExecutorService create(PipelineOptions options) {
}
}
+ /**
+ * Returns the default {@link ExecutorService} to use within the Apache Beam
SDK. The {@link
+ * ExecutorService} is compatible with AppEngine.
+ */
+ class ScheduledExecutorServiceFactory implements
DefaultValueFactory<ScheduledExecutorService> {
+ @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for
internal use only.
+ @Override
+ public ScheduledExecutorService create(PipelineOptions options) {
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+ threadFactoryBuilder.setDaemon(true);
+ /* The SDK requires an unbounded thread pool because a step may create X
writers
+ * each requiring their own thread to perform the writes otherwise a
writer may
+ * block causing deadlock for the step because the writers buffer is
full.
+ * Also, the MapTaskExecutor launches the steps in reverse order and
completes
+ * them in forward order thus requiring enough threads so that each
step's writers
+ * can be active.
+ */
+
+ return Executors.newScheduledThreadPool(
+ Math.max(4, Runtime.getRuntime().availableProcessors()),
threadFactoryBuilder.build());
Review Comment:
What was your reasoning around choosing this as the minimum?
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java:
##########
@@ -68,6 +70,24 @@ public interface GcsOptions extends ApplicationNameOptions,
GcpOptions, Pipeline
Review Comment:
Update the ExecutorServiceFactory class to return the scheduled executor
service by default.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java:
##########
@@ -1482,12 +1484,32 @@ private static BigQueryWriteClient
newBigQueryWriteClient(BigQueryOptions option
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(() ->
options.as(GcpOptions.class).getGcpCredential())
+ .setBackgroundExecutorProvider(new
OptionsExecutionProvider(options))
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ private static class OptionsExecutionProvider implements ExecutorProvider {
Review Comment:
class comment
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java:
##########
@@ -68,6 +70,24 @@ public interface GcsOptions extends ApplicationNameOptions,
GcpOptions, Pipeline
void setExecutorService(ExecutorService value);
Review Comment:
Mark getExecutorService and setExecutorService as deprecated telling people
to use the scheduled executor service variant.
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java:
##########
@@ -68,6 +70,24 @@ public interface GcsOptions extends ApplicationNameOptions,
GcpOptions, Pipeline
void setExecutorService(ExecutorService value);
+ /**
+ * The ScheduledExecutorService instance to use to create threads, can be
overridden to specify a
+ * ScheduledExecutorService that is compatible with the user's environment.
If unset, the default
+ * is to create an ScheduledExecutorService with a core number of threads
equal to Math.max(4,
+ * Runtime.getRuntime().availableProcessors())
+ */
+ @JsonIgnore
+ @Description(
+ "The ScheduledExecutorService instance to use to create threads, can be
overridden to specify "
Review Comment:
It makes a lot of sense to not declare this pipeline option in GcsOptions
since the original choice of having this in GcsOptions was done a long time ago
since GCS needed it but it is used in many places.
Since it is used both during pipeline construction and pipeline execution it
makes sense to move it to its own options class in
`org/apache/beam/sdk/options/ExecutorOptions.java` in `sdks/java/core`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]