This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d11bca [BEAM-7668] Add ability to query a pipeline definition from a
gRPC JobService
new 05dba6a Merge PR #8977
7d11bca is described below
commit 7d11bca6aa5039bcd8ea7badb1319b6dcca9fb8e
Author: Chad Dombrova <[email protected]>
AuthorDate: Mon Jul 1 16:16:59 2019 -0400
[BEAM-7668] Add ability to query a pipeline definition from a gRPC
JobService
---
.../src/main/proto/beam_job_api.proto | 16 +++++
.../jobsubmission/InMemoryJobService.java | 23 +++++++
.../fnexecution/jobsubmission/JobInvocation.java | 5 ++
.../jobsubmission/InMemoryJobServiceTest.java | 70 ++++++++++++++++++----
.../runners/portability/local_job_service.py | 4 ++
5 files changed, 108 insertions(+), 10 deletions(-)
diff --git a/model/job-management/src/main/proto/beam_job_api.proto
b/model/job-management/src/main/proto/beam_job_api.proto
index c7b972e..1226581 100644
--- a/model/job-management/src/main/proto/beam_job_api.proto
+++ b/model/job-management/src/main/proto/beam_job_api.proto
@@ -46,6 +46,9 @@ service JobService {
// Get the current state of the job
rpc GetState (GetJobStateRequest) returns (GetJobStateResponse);
+ // Get the job's pipeline
+ rpc GetPipeline (GetJobPipelineRequest) returns (GetJobPipelineResponse);
+
// Cancel the job
rpc Cancel (CancelJobRequest) returns (CancelJobResponse);
@@ -134,6 +137,19 @@ message GetJobStateResponse {
}
+// GetPipeline is a synchronus request that returns a pipeline back
+// Throws error GRPC_STATUS_UNAVAILABLE if server is down
+// Throws error NOT_FOUND if the jobId is not found
+message GetJobPipelineRequest {
+ string job_id = 1; // (required)
+
+}
+
+message GetJobPipelineResponse {
+ org.apache.beam.model.pipeline.v1.Pipeline pipeline = 1; // (required)
+}
+
+
// GetJobMessages is a streaming api for streaming job messages from the
service
// One request will connect you to the job and you'll get a stream of job state
// and job messages back; one is used for logging and the other for detecting
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index 642d3bd..5a3f228 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -26,6 +26,8 @@ import
org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
import
org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsRequest;
import
org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
@@ -38,6 +40,7 @@ import
org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
@@ -235,6 +238,26 @@ public class InMemoryJobService extends
JobServiceGrpc.JobServiceImplBase implem
}
@Override
+ public void getPipeline(
+ GetJobPipelineRequest request, StreamObserver<GetJobPipelineResponse>
responseObserver) {
+ LOG.trace("{} {}", GetJobPipelineRequest.class.getSimpleName(), request);
+ String invocationId = request.getJobId();
+ try {
+ JobInvocation invocation = getInvocation(invocationId);
+ RunnerApi.Pipeline pipeline = invocation.getPipeline();
+ GetJobPipelineResponse response =
+ GetJobPipelineResponse.newBuilder().setPipeline(pipeline).build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ String errMessage =
+ String.format("Encountered Unexpected Exception for Invocation %s",
invocationId);
+ LOG.error(errMessage, e);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ }
+ }
+
+ @Override
public void cancel(CancelJobRequest request,
StreamObserver<CancelJobResponse> responseObserver) {
LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request);
String invocationId = request.getJobId();
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index ab4fda7..819a007 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -175,6 +175,11 @@ public class JobInvocation {
return this.jobState;
}
+ /** Retrieve the job's pipeline. */
+ public RunnerApi.Pipeline getPipeline() {
+ return this.pipeline;
+ }
+
/** Listen for job state changes with a {@link Consumer}. */
public synchronized void addStateListener(Consumer<JobState.Enum>
stateStreamObserver) {
stateStreamObserver.accept(getState());
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
index 961016a..01346af 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.jobsubmission;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
@@ -31,6 +32,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.junit.Before;
import org.junit.Test;
@@ -62,6 +64,35 @@ public class InMemoryJobServiceTest {
InMemoryJobService.create(stagingServiceDescriptor, session ->
"token", null, invoker);
when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS,
TEST_RETRIEVAL_TOKEN)).thenReturn(invocation);
when(invocation.getId()).thenReturn(TEST_JOB_ID);
+ when(invocation.getPipeline()).thenReturn(TEST_PIPELINE);
+ }
+
+ private JobApi.PrepareJobResponse prepareJob() {
+ JobApi.PrepareJobRequest request =
+ JobApi.PrepareJobRequest.newBuilder()
+ .setJobName(TEST_JOB_NAME)
+ .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
+ .setPipelineOptions(Struct.getDefaultInstance())
+ .build();
+ RecordingObserver<JobApi.PrepareJobResponse> recorder = new
RecordingObserver<>();
+ service.prepare(request, recorder);
+ return recorder.values.get(0);
+ }
+
+ private JobApi.RunJobResponse runJob(String preparationId) {
+ JobApi.RunJobRequest runRequest =
+ JobApi.RunJobRequest.newBuilder()
+ .setPreparationId(preparationId)
+ .setRetrievalToken(TEST_RETRIEVAL_TOKEN)
+ .build();
+ RecordingObserver<JobApi.RunJobResponse> recorder = new
RecordingObserver<>();
+ service.run(runRequest, recorder);
+ return recorder.values.get(0);
+ }
+
+ private JobApi.RunJobResponse prepareAndRunJob() {
+ JobApi.PrepareJobResponse prepareResponse = prepareJob();
+ return runJob(prepareResponse.getPreparationId());
}
@Test
@@ -82,17 +113,36 @@ public class InMemoryJobServiceTest {
}
@Test
+ public void testGetPipelineFailure() {
+ prepareJob();
+
+ JobApi.GetJobPipelineRequest request =
+
JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
+ RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new
RecordingObserver<>();
+ service.getPipeline(request, recorder);
+ // job has not been run yet
+ assertThat(recorder.isSuccessful(), is(false));
+ assertThat(recorder.error, isA(StatusException.class));
+ }
+
+ @Test
+ public void testGetPipelineIsSuccessful() throws Exception {
+ prepareAndRunJob();
+
+ JobApi.GetJobPipelineRequest request =
+
JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
+ RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new
RecordingObserver<>();
+ service.getPipeline(request, recorder);
+ assertThat(recorder.isSuccessful(), is(true));
+ assertThat(recorder.values, hasSize(1));
+ JobApi.GetJobPipelineResponse response = recorder.values.get(0);
+ assertThat(response.getPipeline(), is(TEST_PIPELINE));
+ }
+
+ @Test
public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception {
- // prepare job
- JobApi.PrepareJobRequest prepareRequest =
- JobApi.PrepareJobRequest.newBuilder()
- .setJobName(TEST_JOB_NAME)
- .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
- .setPipelineOptions(Struct.getDefaultInstance())
- .build();
- RecordingObserver<JobApi.PrepareJobResponse> prepareRecorder = new
RecordingObserver<>();
- service.prepare(prepareRequest, prepareRecorder);
- JobApi.PrepareJobResponse prepareResponse = prepareRecorder.values.get(0);
+ JobApi.PrepareJobResponse prepareResponse = prepareJob();
+
// run job
JobApi.RunJobRequest runRequest =
JobApi.RunJobRequest.newBuilder()
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py
b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 91ceff7..48193ee 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -131,6 +131,10 @@ class
LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
return beam_job_api_pb2.GetJobStateResponse(
state=self._jobs[request.job_id].state)
+ def GetPipeline(self, request, context=None):
+ return beam_job_api_pb2.GetJobPipelineResponse(
+ pipeline=self._jobs[request.job_id]._pipeline_proto)
+
def Cancel(self, request, context=None):
self._jobs[request.job_id].cancel()
return beam_job_api_pb2.CancelJobRequest(