This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f6b00a2 [BEAM-8137] Add Main method to ExternalWorkerService (#14942)
f6b00a2 is described below
commit f6b00a2fa1fef920ff1e034b85efd65314c0e8b2
Author: Ke Wu <[email protected]>
AuthorDate: Wed Jun 9 15:30:51 2021 -0700
[BEAM-8137] Add Main method to ExternalWorkerService (#14942)
* [BEAM-8137] Add Main method to ExternalWorkerService
1. Add main method to ExternalWorkerService to support launching worker
pool from Java SDK Container.
* Add more javadoc/comments
* Add more javadoc/comments
* Update to sleep for Long.MAX_VALUE
* Update format
* Update LOG.error
* Fix checker error
* Update error message
Co-authored-by: Ke Wu <[email protected]>
---
.../runners/core/construction/Environments.java | 20 ++++----
.../runners/portability/ExternalWorkerService.java | 60 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 12 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index d7ac42e..473ae4e 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -404,6 +404,16 @@ public class Environments {
return String.format("%s-%s%s", fileName, encodedHash, suffix);
}
+ public static String getExternalServiceAddress(PortablePipelineOptions
options) {
+ String environmentConfig = options.getDefaultEnvironmentConfig();
+ String environmentOption =
+ PortablePipelineOptions.getEnvironmentOption(options,
externalServiceAddressOption);
+ if (environmentConfig != null && !environmentConfig.isEmpty()) {
+ return environmentConfig;
+ }
+ return environmentOption;
+ }
+
private static File zipDirectory(File directory) throws IOException {
File zipFile = File.createTempFile(directory.getName(), ".zip");
try (FileOutputStream fos = new FileOutputStream(zipFile)) {
@@ -454,16 +464,6 @@ public class Environments {
return environmentOption;
}
- private static String getExternalServiceAddress(PortablePipelineOptions
options) {
- String environmentConfig = options.getDefaultEnvironmentConfig();
- String environmentOption =
- PortablePipelineOptions.getEnvironmentOption(options,
externalServiceAddressOption);
- if (environmentConfig != null && !environmentConfig.isEmpty()) {
- return environmentConfig;
- }
- return environmentOption;
- }
-
private static Map<String, String>
getProcessVariables(PortablePipelineOptions options) {
ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
String assignments =
diff --git
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
index 9fbfaac..000ff64 100644
---
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
+++
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.portability;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
import java.util.Collections;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest;
@@ -24,10 +26,15 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse;
import
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase
implements FnService {
private static final Logger LOG =
LoggerFactory.getLogger(ExternalWorkerService.class);
+ private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
private final PipelineOptions options;
private final ServerFactory serverFactory = ServerFactory.createDefault();
@@ -90,10 +98,58 @@ public class ExternalWorkerService extends
BeamFnExternalWorkerPoolImplBase impl
public void close() {}
public GrpcFnServer<ExternalWorkerService> start() throws Exception {
- GrpcFnServer<ExternalWorkerService> server =
- GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ final String externalServiceAddress =
+
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+ GrpcFnServer<ExternalWorkerService> server;
+ if (externalServiceAddress.isEmpty()) {
+ server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+ } else {
+ server =
+ GrpcFnServer.create(
+ this,
+
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+ serverFactory);
+ }
LOG.debug(
"Listening for worker start requests at {}.",
server.getApiServiceDescriptor().getUrl());
return server;
}
+
+ /**
+ * Worker pool entry point.
+ *
+ * <p>The worker pool exposes an RPC service that is used with EXTERNAL
environment to start and
+ * stop the SDK workers.
+ *
+ * <p>The worker pool uses threads for parallelism;
+ *
+ * <p>This entry point is used by the Java SDK container in worker pool mode
and expects the
+ * following environment variables:
+ *
+ * <ul>
+ * <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. It
needs to be known
+ * up-front and matches the running job. See {@link PipelineOptions}
for further details.
+ * </ul>
+ */
+ public static void main(String[] args) {
+ LOG.info("Starting external worker service");
+ final String optionsEnv =
+ checkArgumentNotNull(
+ System.getenv(PIPELINE_OPTIONS_ENV_VAR),
+ "No pipeline options provided in environment variables " +
PIPELINE_OPTIONS_ENV_VAR);
+ LOG.info("Pipeline options {}", optionsEnv);
+ PipelineOptions options = PipelineOptionsTranslation.fromJson(optionsEnv);
+
+ try (GrpcFnServer<ExternalWorkerService> server = new
ExternalWorkerService(options).start()) {
+ LOG.info(
+ "External worker service started at address: {}",
+ server.getApiServiceDescriptor().getUrl());
+ // Wait to keep ExternalWorkerService running
+ Sleeper.DEFAULT.sleep(Long.MAX_VALUE);
+ } catch (Exception e) {
+ LOG.error("Error running worker service", e);
+ } finally {
+ LOG.info("External worker service stopped.");
+ }
+ }
}