[
https://issues.apache.org/jira/browse/BEAM-6284?focusedWorklogId=246444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-246444
]
ASF GitHub Bot logged work on BEAM-6284:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/May/19 23:04
Start Date: 21/May/19 23:04
Worklog Time Spent: 10m
Work Description: akedin commented on pull request #8629: [BEAM-6284]
Improve error message on waitUntilFinish.
URL: https://github.com/apache/beam/pull/8629#discussion_r286257535
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
##########
@@ -261,96 +275,123 @@ State waitUntilFinish(
MonitoringUtil monitor)
throws IOException, InterruptedException {
- BackOff backoff;
- if (!duration.isLongerThan(Duration.ZERO)) {
- backoff =
BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
- } else {
- backoff =
- BackOffAdapter.toGcpBackOff(
-
MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
- }
+ BackOff backoff = getBackoff(duration, MESSAGES_BACKOFF_FACTORY);
// This function tracks the cumulative time from the *first request* to
enforce the wall-clock
// limit. Any backoff instance could, at best, track the the time since
the first attempt at a
// given request. Thus, we need to track the cumulative time ourselves.
long startNanos = nanoClock.nanoTime();
- State state;
+ State state = State.UNKNOWN;
+ Exception exception;
do {
- // Get the state of the job before listing messages. This ensures we
always fetch job
- // messages after the job finishes to ensure we have all them.
- state =
- getStateWithRetries(
-
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
- sleeper);
- boolean hasError = state == State.UNKNOWN;
-
- if (messageHandler != null && !hasError) {
- // Process all the job messages that have accumulated so far.
- try {
- List<JobMessage> allMessages = monitor.getJobMessages(getJobId(),
lastTimestamp);
-
- if (!allMessages.isEmpty()) {
- lastTimestamp =
- fromCloudTime(allMessages.get(allMessages.size() -
1).getTime()).getMillis();
- messageHandler.process(allMessages);
- }
- } catch (GoogleJsonResponseException | SocketTimeoutException e) {
- hasError = true;
- LOG.warn("There were problems getting current job messages: {}.",
e.getMessage());
- LOG.debug("Exception information:", e);
- }
+ exception = null;
+ try {
+ // Get the state of the job before listing messages. This ensures we
always fetch job
+ // messages after the job finishes to ensure we have all them.
+ state =
+ getStateWithRetries(
+
BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+ sleeper);
+ } catch (IOException e) {
+ exception = e;
+ LOG.warn("Failed to get job state: {}", e.getMessage());
+ LOG.debug("Failed to get job state: {}", e);
+ continue;
}
- if (!hasError) {
- // We can stop if the job is done.
- if (state.isTerminal()) {
- switch (state) {
- case DONE:
- case CANCELLED:
- LOG.info("Job {} finished with status {}.", getJobId(), state);
- break;
- case UPDATED:
- LOG.info(
- "Job {} has been updated and is running as the new job with
id {}. "
- + "To access the updated job on the Dataflow monitoring
console, "
- + "please navigate to {}",
- getJobId(),
- getReplacedByJob().getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(
- getReplacedByJob().getProjectId(),
- getRegion(),
- getReplacedByJob().getJobId()));
- break;
- default:
- LOG.info("Job {} failed with status {}.", getJobId(), state);
- }
- return state;
- }
+ exception = processJobMessages(messageHandler, monitor);
+
+ if (exception != null) {
Review comment:
Let me try to summarize the main flow to see if I understand it correctly:
**Previous Flow**
* get job state:
* get non-`UNKNOWN` state -> **reset backoff** -> continue loop if not
terminal;
* will timeout at max duration or get a terminal state; correct
behavior;
* get `IOException`, same as:
* get `UNKNOWN` state -> continue loop unconditionally;
* does not **not** reset backoff;
* can exceed number of allowed attempts fast, not waiting for max
allowed duration;
**New Flow**
* get job state:
* get non-`UNKNOWN` or `UNKNOWN` state -> **reset backoff** -> continue
loop if not terminal;
* can only receive `UNKNOWN` explicitly;
* will timeout at max duration or get a terminal state; correct behavior;
* get `IOException` -> continue loop unconditionally:
* can exceed number of attempts instead of waiting for max allowed time;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 246444)
Time Spent: 20m (was: 10m)
> [FLAKE][beam_PostCommit_Java_ValidatesRunner_Dataflow] TestRunner fails with
> result UNKNOWN on succeeded job and checks passed
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-6284
> URL: https://issues.apache.org/jira/browse/BEAM-6284
> Project: Beam
> Issue Type: Bug
> Components: test-failures, testing
> Reporter: Mikhail Gryzykhin
> Assignee: Mikhail Gryzykhin
> Priority: Major
> Labels: currently-failing
> Time Spent: 20m
> Remaining Estimate: 0h
>
> _Use this form to file an issue for test failure:_
> *
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/testReport/junit/org.apache.beam.sdk.transforms/ViewTest/testWindowedSideInputFixedToGlobal/
> Initial investigation:
> According to logs all test-relevant checks have passed and it seem to be
> testing framework failure.
> ----
> _After you've filled out the above details, please [assign the issue to an
> individual|https://beam.apache.org/contribute/postcommits-guides/index.html#find_specialist].
> Assignee should [treat test failures as
> high-priority|https://beam.apache.org/contribute/postcommits-policies/#assigned-failing-test],
> helping to fix the issue or find a more appropriate owner. See [Apache Beam
> Post-Commit
> Policies|https://beam.apache.org/contribute/postcommits-policies]._
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)