ibzib commented on a change in pull request #14942:
URL: https://github.com/apache/beam/pull/14942#discussion_r646907646
##########
File path:
runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -38,6 +44,7 @@
public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase
implements FnService {
private static final Logger LOG =
LoggerFactory.getLogger(ExternalWorkerService.class);
+ private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
Review comment:
Nit: we can rename this variable to make its usage more obvious.
```suggestion
private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
```
##########
File path:
runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
public void close() {}
public GrpcFnServer<ExternalWorkerService> start() throws Exception {
- GrpcFnServer<ExternalWorkerService> server =
- GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ final String externalServiceAddress =
+
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+ GrpcFnServer<ExternalWorkerService> server;
+ if (externalServiceAddress.isEmpty()) {
+ server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ } else {
+ server =
+ GrpcFnServer.create(
+ this,
+
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+ serverFactory);
+ }
LOG.debug(
"Listening for worker start requests at {}.",
server.getApiServiceDescriptor().getUrl());
return server;
}
+
+ /**
+ * Worker pool entry point.
+ *
+ * <p>The worker pool exposes an RPC service that is used with EXTERNAL
environment to start and
+ * stop the SDK workers.
+ *
+ * <p>The worker pool uses threads for parallelism;
+ *
+ * <p>This entry point is used by the Java SDK container in worker pool mode.
+ */
+ public static void main(String[] args) throws Exception {
+ main(System::getenv);
+ }
+
+ public static void main(Function<String, String> environmentVarGetter)
throws Exception {
+ System.out.format("Starting external worker service%n");
Review comment:
Why `System.out` instead of the logger?
##########
File path:
runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
public void close() {}
public GrpcFnServer<ExternalWorkerService> start() throws Exception {
- GrpcFnServer<ExternalWorkerService> server =
- GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ final String externalServiceAddress =
+
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+ GrpcFnServer<ExternalWorkerService> server;
+ if (externalServiceAddress.isEmpty()) {
+ server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ } else {
+ server =
+ GrpcFnServer.create(
+ this,
+
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+ serverFactory);
+ }
LOG.debug(
"Listening for worker start requests at {}.",
server.getApiServiceDescriptor().getUrl());
return server;
}
+
+ /**
+ * Worker pool entry point.
+ *
+ * <p>The worker pool exposes an RPC service that is used with EXTERNAL
environment to start and
+ * stop the SDK workers.
+ *
+ * <p>The worker pool uses threads for parallelism;
+ *
+ * <p>This entry point is used by the Java SDK container in worker pool mode.
+ */
+ public static void main(String[] args) throws Exception {
+ main(System::getenv);
+ }
+
+ public static void main(Function<String, String> environmentVarGetter)
throws Exception {
+ System.out.format("Starting external worker service%n");
+ System.out.format("Pipeline options %s%n",
environmentVarGetter.apply(PIPELINE_OPTIONS));
+ PipelineOptions options =
+
PipelineOptionsTranslation.fromJson(environmentVarGetter.apply(PIPELINE_OPTIONS));
+
+ try (GrpcFnServer<ExternalWorkerService> server = new
ExternalWorkerService(options).start()) {
+ System.out.format(
+ "External worker service started at address: %s",
+ server.getApiServiceDescriptor().getUrl());
+ while (true) {
+ // Wait indefinitely to keep ExternalWorkerService running
+ Sleeper.DEFAULT.sleep(60 * 60 * 24 * 1000);
Review comment:
Any reason to use a `while` loop * 24hr sleep instead of simply a very
long sleep?
##########
File path:
runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
##########
@@ -90,10 +97,55 @@ public void stopWorker(
public void close() {}
public GrpcFnServer<ExternalWorkerService> start() throws Exception {
- GrpcFnServer<ExternalWorkerService> server =
- GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ final String externalServiceAddress =
+
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+ GrpcFnServer<ExternalWorkerService> server;
+ if (externalServiceAddress.isEmpty()) {
+ server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ } else {
+ server =
+ GrpcFnServer.create(
+ this,
+
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+ serverFactory);
+ }
LOG.debug(
"Listening for worker start requests at {}.",
server.getApiServiceDescriptor().getUrl());
return server;
}
+
+ /**
+ * Worker pool entry point.
+ *
+ * <p>The worker pool exposes an RPC service that is used with EXTERNAL
environment to start and
+ * stop the SDK workers.
+ *
+ * <p>The worker pool uses threads for parallelism;
+ *
+ * <p>This entry point is used by the Java SDK container in worker pool mode.
+ */
+ public static void main(String[] args) throws Exception {
+ main(System::getenv);
+ }
+
+ public static void main(Function<String, String> environmentVarGetter)
throws Exception {
+ System.out.format("Starting external worker service%n");
+ System.out.format("Pipeline options %s%n",
environmentVarGetter.apply(PIPELINE_OPTIONS));
Review comment:
This code assumes pipeline options are known ahead of time. Ideally, a
worker pool should be logically uncoupled from a single pipeline, and able to
serve multiple pipelines with varying options. It may not be possible, however,
without modifying StartWorkerRequest to include pipeline options.
This wasn't a problem in Python. The difference is that in Java, FnHarness
takes pipeline options as an argument. sdk_worker.py, as far as I can tell,
does not use pipeline options.
It may be acceptable to take pipeline options up-front, but it should be
clear from documentation that `PIPELINE_OPTIONS` should match the pipeline
options used by the job, or else there could be confusion about which set of
options is actually read in various places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]