[ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103167
 ]

ASF GitHub Bot logged work on BEAM-2588:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/18 23:23
            Start Date: 17/May/18 23:23
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5262: [BEAM-2588] Portability 
Runner Job Service
URL: https://github.com/apache/beam/pull/5262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
new file mode 100644
index 00000000000..fde34826e14
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A InMemoryJobService that prepares and runs jobs on behalf of a client 
using a
+ * {@link JobInvoker}.
+ *
+ * <p>Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the InMemoryJobService crashes.
+ *
+ * <p>TODO: replace in-memory job management state with persistent solution.
+ */
+public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase 
implements FnService {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryJobService.class);
+
+  public static InMemoryJobService create(
+      Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+    return new InMemoryJobService(stagingServiceDescriptor, invoker);
+  }
+
+  private final ConcurrentMap<String, JobPreparation> preparations;
+  private final ConcurrentMap<String, JobInvocation> invocations;
+  private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
+  private final JobInvoker invoker;
+
+  private InMemoryJobService(
+      Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+    this.stagingServiceDescriptor = stagingServiceDescriptor;
+    this.invoker = invoker;
+
+    this.preparations = new ConcurrentHashMap<>();
+    this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+      PrepareJobRequest request,
+      StreamObserver<PrepareJobResponse> responseObserver) {
+    try {
+      LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+      // insert preparation
+      String preparationId =
+          String.format("%s_%s", request.getJobName(), 
UUID.randomUUID().toString());
+      Struct pipelineOptions = request.getPipelineOptions();
+      if (pipelineOptions == null) {
+        throw new NullPointerException("Encountered null pipeline options.");
+      }
+      LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+      JobPreparation preparation =
+          JobPreparation
+              .builder()
+              .setId(preparationId)
+              .setPipeline(request.getPipeline())
+              .setOptions(pipelineOptions)
+              .build();
+      JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+      if (previous != null) {
+        // this should never happen with a UUID
+        String errMessage =
+            String.format("A job with the preparation ID \"%s\" already 
exists.", preparationId);
+        StatusException exception = 
Status.NOT_FOUND.withDescription(errMessage).asException();
+        responseObserver.onError(exception);
+        return;
+      }
+
+      // send response
+      PrepareJobResponse response =
+          PrepareJobResponse
+              .newBuilder()
+              .setPreparationId(preparationId)
+              .setArtifactStagingEndpoint(stagingServiceDescriptor)
+              .build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.error("Could not prepare job with name {}", request.getJobName(), e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void run(
+      RunJobRequest request, StreamObserver<RunJobResponse> responseObserver) {
+    LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request);
+
+    String preparationId = request.getPreparationId();
+    try {
+      // retrieve job preparation
+      JobPreparation preparation = preparations.get(preparationId);
+      if (preparation == null) {
+        String errMessage = String.format("Unknown Preparation Id \"%s\".", 
preparationId);
+        StatusException exception = 
Status.NOT_FOUND.withDescription(errMessage).asException();
+        responseObserver.onError(exception);
+        return;
+      }
+
+      // create new invocation
+      JobInvocation invocation =
+          invoker.invoke(preparation.pipeline(), preparation.options(), 
request.getStagingToken());
+      String invocationId = invocation.getId();
+      invocation.start();
+      invocations.put(invocationId, invocation);
+      RunJobResponse response =
+          RunJobResponse.newBuilder().setJobId(invocationId).build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (StatusRuntimeException e) {
+      LOG.warn("Encountered Status Exception", e);
+      responseObserver.onError(e);
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Preparation %s", 
preparationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void getState(
+      GetJobStateRequest request, StreamObserver<GetJobStateResponse> 
responseObserver) {
+    LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
+    String invocationId = request.getJobId();
+    try {
+      JobInvocation invocation = getInvocation(invocationId);
+      JobState.Enum state = invocation.getState();
+      GetJobStateResponse response = 
GetJobStateResponse.newBuilder().setState(state).build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
invocationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void cancel(CancelJobRequest request, 
StreamObserver<CancelJobResponse> responseObserver) {
+    LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request);
+    String invocationId = request.getJobId();
+    try {
+      JobInvocation invocation = getInvocation(invocationId);
+      invocation.cancel();
+      JobState.Enum state = invocation.getState();
+      CancelJobResponse response = 
CancelJobResponse.newBuilder().setState(state).build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
invocationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void getStateStream(
+      GetJobStateRequest request,
+      StreamObserver<GetJobStateResponse> responseObserver) {
+    LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
+    String invocationId = request.getJobId();
+    try {
+      JobInvocation invocation = getInvocation(invocationId);
+      Function<JobState.Enum, GetJobStateResponse> responseFunction =
+          state -> GetJobStateResponse.newBuilder().setState(state).build();
+      Consumer<JobState.Enum> stateListener =
+          state -> responseObserver.onNext(responseFunction.apply(state));
+      invocation.addStateListener(stateListener);
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
invocationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void getMessageStream(
+      JobMessagesRequest request,
+      StreamObserver<JobMessagesResponse> responseObserver) {
+    String invocationId = request.getJobId();
+    try {
+      JobInvocation invocation = getInvocation(invocationId);
+      // synchronization is necessary since we use this stream observer in 
both the state listener
+      // and message listener.
+      StreamObserver<JobMessagesResponse> syncResponseObserver =
+          SynchronizedStreamObserver.wrapping(responseObserver);
+      Consumer<JobState.Enum> stateListener =
+          state -> syncResponseObserver.onNext(
+              JobMessagesResponse.newBuilder().setStateResponse(
+                  GetJobStateResponse.newBuilder().setState(state).build()
+              ).build());
+      Consumer<JobMessage> messageListener =
+          message -> syncResponseObserver.onNext(
+              
JobMessagesResponse.newBuilder().setMessageResponse(message).build());
+
+      invocation.addStateListener(stateListener);
+      invocation.addMessageListener(messageListener);
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
invocationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // TODO: throw error if jobs are running
+  }
+
+  private JobInvocation getInvocation(String invocationId) throws 
StatusException {
+    JobInvocation invocation = invocations.get(invocationId);
+    if (invocation == null) {
+      throw Status.NOT_FOUND.asException();
+    }
+    return invocation;
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
new file mode 100644
index 00000000000..5c1549bd7b2
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+
+/**
+ * Internal representation of a Job which has been invoked (prepared and run) 
by a client.
+ */
+public interface JobInvocation {
+
+  /**
+   * Start the job.
+   */
+  void start();
+
+  /**
+   * @return Unique identifier for the job invocation.
+   */
+  String getId();
+
+  /**
+   * Cancel the job.
+   */
+  void cancel();
+
+  /**
+   * Retrieve the job's current state.
+   */
+  JobState.Enum getState();
+
+  /**
+   * Listen for job state changes with a {@link Consumer}.
+   */
+  void addStateListener(Consumer<Enum> stateStreamObserver);
+
+  /**
+   * Listen for job messages with a {@link Consumer}.
+   */
+  void addMessageListener(Consumer<JobMessage> messageStreamObserver);
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
new file mode 100644
index 00000000000..b20d7c7aba3
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Factory to create a {@link JobInvocation} instances.
+ */
+public interface JobInvoker {
+  /**
+   * Start running a job, abstracting its state as a {@link JobInvocation} 
instance.
+   */
+  JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable 
String artifactToken)
+      throws IOException;
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
new file mode 100644
index 00000000000..e987fdea17d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.Struct;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+
+/** A job that has been prepared, but not invoked. */
+@AutoValue
+public abstract class JobPreparation {
+  public static Builder builder() {
+    return new AutoValue_JobPreparation.Builder();
+  }
+
+  public abstract String id();
+  public abstract Pipeline pipeline();
+  public abstract Struct options();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setId(String id);
+    abstract Builder setPipeline(Pipeline pipeline);
+    abstract Builder setOptions(Struct options);
+    abstract JobPreparation build();
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java
new file mode 100644
index 00000000000..fcb6b5b23bc
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Job management services for use in beam runners.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
new file mode 100644
index 00000000000..fc01c8f7623
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.Struct;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link InMemoryJobService}. */
+@RunWith(JUnit4.class)
+public class InMemoryJobServiceTest {
+  private static final String TEST_JOB_NAME = "test-job";
+  private static final String TEST_JOB_ID = "test-job-id";
+  private static final String TEST_STAGING_TOKEN = "test-staging-token";
+  private static final RunnerApi.Pipeline TEST_PIPELINE = 
RunnerApi.Pipeline.getDefaultInstance();
+  private static final Struct TEST_OPTIONS = Struct.getDefaultInstance();
+
+
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
+  @Mock
+  JobInvoker invoker;
+  @Mock
+  JobInvocation invocation;
+
+  InMemoryJobService service;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    stagingServiceDescriptor = 
Endpoints.ApiServiceDescriptor.getDefaultInstance();
+    service = InMemoryJobService.create(stagingServiceDescriptor, invoker);
+    when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, 
TEST_STAGING_TOKEN)).thenReturn(invocation);
+    when(invocation.getId()).thenReturn(TEST_JOB_ID);
+  }
+
+  @Test
+  public void testPrepareIsSuccessful() {
+    JobApi.PrepareJobRequest request =
+        JobApi.PrepareJobRequest
+            .newBuilder()
+            .setJobName(TEST_JOB_NAME)
+            .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
+            .setPipelineOptions(Struct.getDefaultInstance())
+            .build();
+    RecordingObserver<JobApi.PrepareJobResponse> recorder = new 
RecordingObserver<>();
+    service.prepare(request, recorder);
+    assertThat(recorder.isSuccessful(), is(true));
+    assertThat(recorder.values, hasSize(1));
+    JobApi.PrepareJobResponse response = recorder.values.get(0);
+    assertThat(response.getArtifactStagingEndpoint(), notNullValue());
+    assertThat(response.getPreparationId(), notNullValue());
+  }
+
+  @Test
+  public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception {
+    // prepare job
+    JobApi.PrepareJobRequest prepareRequest =
+        JobApi.PrepareJobRequest.newBuilder()
+            .setJobName(TEST_JOB_NAME)
+            .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
+            .setPipelineOptions(Struct.getDefaultInstance())
+            .build();
+    RecordingObserver<JobApi.PrepareJobResponse> prepareRecorder = new 
RecordingObserver<>();
+    service.prepare(prepareRequest, prepareRecorder);
+    JobApi.PrepareJobResponse prepareResponse = prepareRecorder.values.get(0);
+    // run job
+    JobApi.RunJobRequest runRequest =
+        JobApi.RunJobRequest.newBuilder()
+            .setPreparationId(prepareResponse.getPreparationId())
+            .setStagingToken(TEST_STAGING_TOKEN)
+            .build();
+    RecordingObserver<JobApi.RunJobResponse> runRecorder = new 
RecordingObserver<>();
+    service.run(runRequest, runRecorder);
+    verify(invoker, times(1)).invoke(TEST_PIPELINE, TEST_OPTIONS, 
TEST_STAGING_TOKEN);
+    assertThat(runRecorder.isSuccessful(), is(true));
+    assertThat(runRecorder.values, hasSize(1));
+    JobApi.RunJobResponse runResponse = runRecorder.values.get(0);
+    assertThat(runResponse.getJobId(), is(TEST_JOB_ID));
+  }
+
+  private static class RecordingObserver<T> implements StreamObserver<T> {
+    ArrayList<T> values = new ArrayList<>();
+    Throwable error = null;
+    boolean isCompleted = false;
+
+    @Override
+    public void onNext(T t) {
+      values.add(t);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      error = throwable;
+    }
+
+    @Override
+    public void onCompleted() {
+      isCompleted = true;
+    }
+
+    boolean isSuccessful() {
+      return isCompleted && error == null;
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103167)
    Time Spent: 6h  (was: 5h 50m)

> Portable Flink Runner Job API
> -----------------------------
>
>                 Key: BEAM-2588
>                 URL: https://issues.apache.org/jira/browse/BEAM-2588
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Kenneth Knowles
>            Assignee: Axel Magnuson
>            Priority: Major
>              Labels: portability
>          Time Spent: 6h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to