[
https://issues.apache.org/jira/browse/BEAM-3798?focusedWorklogId=81021&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81021
]
ASF GitHub Bot logged work on BEAM-3798:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Mar/18 23:26
Start Date: 15/Mar/18 23:26
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #4871: [BEAM-3798] Remove
error check on dataflow when getting batch job state
URL: https://github.com/apache/beam/pull/4871
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index e163fe8d674..8679a952284 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -181,7 +181,7 @@ private boolean waitForStreamingJobTermination(
}
/**
- * Return {@code true} if the job succeeded or {@code false} if it
terminated in any other manner.
+ * Return {@code true} if job state is {@code State.DONE}. {@code false}
otherwise.
*/
private boolean waitForBatchJobTermination(
DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
@@ -195,7 +195,7 @@ private boolean waitForBatchJobTermination(
return false;
}
- return job.getState() == State.DONE && !messageHandler.hasSeenError();
+ return job.getState() == State.DONE;
}
}
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
index f382e4b6ed2..cf54556a093 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -121,6 +121,40 @@ public void testRunBatchJobThatSucceeds() throws Exception
{
assertEquals(mockJob, runner.run(p, mockRunner));
}
+ /**
+ * Job success on Dataflow means that it handled transient errors (if any)
successfully
+ * by retrying failed bundles.
+ */
+ @Test
+ public void testRunBatchJobThatSucceedsDespiteTransientErrors() throws
Exception {
+ Pipeline p = Pipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenAnswer(
+ invocation -> {
+ JobMessage message = new JobMessage();
+ message.setMessageText("TransientError");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((JobMessagesHandler)
invocation.getArguments()[1]).process(Arrays.asList(message));
+ return State.DONE;
+ });
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
+ assertEquals(mockJob, runner.run(p, mockRunner));
+ }
+
/**
* Tests that when a batch job terminates in a failure state even if all
assertions
* passed, it throws an error to that effect.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 81021)
Time Spent: 40m (was: 0.5h)
> Performance tests flaky due to Dataflow transient errors
> --------------------------------------------------------
>
> Key: BEAM-3798
> URL: https://issues.apache.org/jira/browse/BEAM-3798
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Łukasz Gajowy
> Assignee: Łukasz Gajowy
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Performance tests are flaky due to transient errors that happened during data
> processing (eg. SocketTimeoutException while connecting to DB). Currently
> exceptions that happen on Dataflow runner but are retried successfully, fail
> the test regardless of the final job state (giving a false-negative result).
> Possible solution for batch scenarios:
> We could "rethrow" exceptions that happened due to transient errors *only* if
> the job status is other than DONE.
> Possible solution for streaming scenarios:
> (don't know yet)
> [Link to discussion on dev list
> |https://lists.apache.org/thread.html/e480f8181913dc81d2d4cd1430557a646537473ccf29fe6390229098@%3Cdev.beam.apache.org%3E]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)