DataflowPipelineJob: catch an underflow in backoff code Forward port of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/422
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ed3b12ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ed3b12ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ed3b12ab Branch: refs/heads/master Commit: ed3b12ab764d7867813957f22b67a518d5140ecd Parents: 7fcc944 Author: Daniel Halperin <dhalp...@users.noreply.github.com> Authored: Wed Sep 7 16:57:26 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Sep 8 17:56:43 2016 -0700 ---------------------------------------------------------------------- .../runners/dataflow/DataflowPipelineJob.java | 26 +++++++---- .../dataflow/DataflowPipelineJobTest.java | 46 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index dad59f2..1af8c98 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -263,19 +263,27 @@ public class DataflowPipelineJob implements PipelineResult { } if (!hasError) { - // Reset the backoff. + // We can stop if the job is done. + if (state.isTerminal()) { + return state; + } + + // The job is not done, so we must keep polling. backoff.reset(); - // If duration is set, update the new cumulative sleep time to be the remaining - // part of the total input sleep duration. + + // If a total duration for all backoff has been set, update the new cumulative sleep time to + // be the remaining total backoff duration, stopping if we have already exceeded the + // allotted time. if (duration.isLongerThan(Duration.ZERO)) { long nanosConsumed = nanoClock.nanoTime() - startNanos; Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000); - backoff = - MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff(); - } - // Check if the job is done. - if (state.isTerminal()) { - return state; + Duration remaining = duration.minus(consumed); + if (remaining.isLongerThan(Duration.ZERO)) { + backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff(); + } else { + // If there is no time remaining, don't bother backing off. + backoff = BackOff.STOP_BACKOFF; + } } } } while(BackOffUtils.next(sleeper, backoff)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed3b12ab/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 226140a..4c70d12 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -32,6 +32,8 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics; @@ -46,6 +48,7 @@ import com.google.common.collect.ImmutableSetMultimap; import java.io.IOException; import java.math.BigDecimal; import java.net.SocketTimeoutException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; @@ -249,6 +252,30 @@ public class DataflowPipelineJobTest { } @Test + public void testCumulativeTimeOverflow() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper(); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + long startTime = clock.nanoTime(); + State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock); + assertEquals(null, state); + long timeDiff = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - startTime); + // Should only have slept for the 4 ms allowed. + assertThat(timeDiff, lessThanOrEqualTo(4L)); + } + + @Test public void testGetStateReturnsServiceState() throws Exception { Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); @@ -609,4 +636,23 @@ public class DataflowPipelineJobTest { String fullName, PTransform<PInput, POutput> transform) { return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform); } + + + private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper { + private long fastNanoTime; + + public FastNanoClockAndFuzzySleeper() { + fastNanoTime = NanoClock.SYSTEM.nanoTime(); + } + + @Override + public long nanoTime() { + return fastNanoTime; + } + + @Override + public void sleep(long millis) throws InterruptedException { + fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000); + } + } }