[
https://issues.apache.org/jira/browse/BEAM-6284?focusedWorklogId=246447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-246447
]
ASF GitHub Bot logged work on BEAM-6284:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/May/19 23:11
Start Date: 21/May/19 23:11
Worklog Time Spent: 10m
Work Description: akedin commented on pull request #8629: [BEAM-6284]
Improve error message on waitUntilFinish.
URL: https://github.com/apache/beam/pull/8629#discussion_r286259125
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
##########
@@ -261,96 +275,123 @@ State waitUntilFinish(
MonitoringUtil monitor)
throws IOException, InterruptedException {
- BackOff backoff;
- if (!duration.isLongerThan(Duration.ZERO)) {
- backoff =
BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
- } else {
- backoff =
- BackOffAdapter.toGcpBackOff(
-
MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
- }
+ BackOff backoff = getBackoff(duration, MESSAGES_BACKOFF_FACTORY);
// This function tracks the cumulative time from the *first request* to
enforce the wall-clock
// limit. Any backoff instance could, at best, track the the time since
the first attempt at a
// given request. Thus, we need to track the cumulative time ourselves.
long startNanos = nanoClock.nanoTime();
- State state;
+ State state = State.UNKNOWN;
+ Exception exception;
do {
- // Get the state of the job before listing messages. This ensures we
always fetch job
- // messages after the job finishes to ensure we have all them.
- state =
- getStateWithRetries(
-
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
- sleeper);
- boolean hasError = state == State.UNKNOWN;
-
- if (messageHandler != null && !hasError) {
- // Process all the job messages that have accumulated so far.
- try {
- List<JobMessage> allMessages = monitor.getJobMessages(getJobId(),
lastTimestamp);
-
- if (!allMessages.isEmpty()) {
- lastTimestamp =
- fromCloudTime(allMessages.get(allMessages.size() -
1).getTime()).getMillis();
- messageHandler.process(allMessages);
- }
- } catch (GoogleJsonResponseException | SocketTimeoutException e) {
- hasError = true;
- LOG.warn("There were problems getting current job messages: {}.",
e.getMessage());
- LOG.debug("Exception information:", e);
- }
+ exception = null;
+ try {
+ // Get the state of the job before listing messages. This ensures we
always fetch job
+ // messages after the job finishes to ensure we have all them.
+ state =
+ getStateWithRetries(
+
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+ sleeper);
+ } catch (IOException e) {
+ exception = e;
+ LOG.warn("Failed to get job state: {}", e.getMessage());
+ LOG.debug("Failed to get job state: {}", e);
+ continue;
}
- if (!hasError) {
- // We can stop if the job is done.
- if (state.isTerminal()) {
- switch (state) {
- case DONE:
- case CANCELLED:
- LOG.info("Job {} finished with status {}.", getJobId(), state);
- break;
- case UPDATED:
- LOG.info(
- "Job {} has been updated and is running as the new job with
id {}. "
- + "To access the updated job on the Dataflow monitoring
console, "
- + "please navigate to {}",
- getJobId(),
- getReplacedByJob().getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(
- getReplacedByJob().getProjectId(),
- getRegion(),
- getReplacedByJob().getJobId()));
- break;
- default:
- LOG.info("Job {} failed with status {}.", getJobId(), state);
- }
- return state;
- }
+ exception = processJobMessages(messageHandler, monitor);
+
+ if (exception != null) {
Review comment:
In this case the logic seems right. I would probably try to organize the
body of the loop to emphasize the flow though, something along the lines of:
```
Optional<State> state = tryGetState();
if (!state.isPresent() || !tryProcessJobMessages()) {
continue;
}
if (state.get().isTerminal()) {
return state.get();
}
resetAttemptsCount();
```
Hope this makes sense
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 246447)
Time Spent: 0.5h (was: 20m)
> [FLAKE][beam_PostCommit_Java_ValidatesRunner_Dataflow] TestRunner fails with
> result UNKNOWN on succeeded job and checks passed
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-6284
> URL: https://issues.apache.org/jira/browse/BEAM-6284
> Project: Beam
> Issue Type: Bug
> Components: test-failures, testing
> Reporter: Mikhail Gryzykhin
> Assignee: Mikhail Gryzykhin
> Priority: Major
> Labels: currently-failing
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> _Use this form to file an issue for test failure:_
> *
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/testReport/junit/org.apache.beam.sdk.transforms/ViewTest/testWindowedSideInputFixedToGlobal/
> Initial investigation:
> According to logs all test-relevant checks have passed and it seem to be
> testing framework failure.
> ----
> _After you've filled out the above details, please [assign the issue to an
> individual|https://beam.apache.org/contribute/postcommits-guides/index.html#find_specialist].
> Assignee should [treat test failures as
> high-priority|https://beam.apache.org/contribute/postcommits-policies/#assigned-failing-test],
> helping to fix the issue or find a more appropriate owner. See [Apache Beam
> Post-Commit
> Policies|https://beam.apache.org/contribute/postcommits-policies]._
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)