[
https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=151120&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151120
]
ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Oct/18 10:39
Start Date: 04/Oct/18 10:39
Worklog Time Spent: 10m
Work Description: mxm closed pull request #6537: [BEAM-4176] Store and
serve termination state after portable job termination
URL: https://github.com/apache/beam/pull/6537
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
index 0318600a4d5..155977ca525 100644
---
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
+++
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
@@ -35,7 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class JobServicePipelineResult implements PipelineResult {
+class JobServicePipelineResult implements PipelineResult, AutoCloseable {
private static final long POLL_INTERVAL_MS = 10 * 1000;
@@ -43,14 +43,19 @@
private final ByteString jobId;
private final CloseableResource<JobServiceBlockingStub> jobService;
+ @Nullable private State terminationState;
JobServicePipelineResult(ByteString jobId,
CloseableResource<JobServiceBlockingStub> jobService) {
this.jobId = jobId;
this.jobService = jobService;
+ this.terminationState = null;
}
@Override
public State getState() {
+ if (terminationState != null) {
+ return terminationState;
+ }
JobServiceBlockingStub stub = jobService.get();
GetJobStateResponse response =
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
@@ -89,6 +94,9 @@ public State waitUntilFinish(Duration duration) {
@Override
public State waitUntilFinish() {
+ if (terminationState != null) {
+ return terminationState;
+ }
JobServiceBlockingStub stub = jobService.get();
GetJobStateRequest request =
GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
GetJobStateResponse response = stub.getState(request);
@@ -103,11 +111,8 @@ public State waitUntilFinish() {
response = stub.getState(request);
lastState = getJavaState(response.getState());
}
- try {
- jobService.close();
- } catch (Exception e) {
- LOG.warn("Error cleaning up job service", e);
- }
+ close();
+ terminationState = lastState;
return lastState;
}
@@ -116,6 +121,14 @@ public MetricResults metrics() {
throw new UnsupportedOperationException("Not yet implemented.");
}
+ @Override
+ public void close() {
+ try (CloseableResource<JobServiceBlockingStub> jobService =
this.jobService) {
+ } catch (Exception e) {
+ LOG.warn("Error cleaning up job service", e);
+ }
+ }
+
private static State getJavaState(JobApi.JobState.Enum protoState) {
switch (protoState) {
case UNSPECIFIED:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 151120)
Time Spent: 28.5h (was: 28h 20m)
> Java: Portable batch runner passes all ValidatesRunner tests that
> non-portable runner passes
> --------------------------------------------------------------------------------------------
>
> Key: BEAM-4176
> URL: https://issues.apache.org/jira/browse/BEAM-4176
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ankur Goenka
> Priority: Major
> Attachments: 81VxNWtFtke.png, Screen Shot 2018-08-14 at 4.18.31
> PM.png, Screen Shot 2018-09-03 at 11.07.38 AM.png
>
> Time Spent: 28.5h
> Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)