Repository: beam Updated Branches: refs/heads/master 87236ce2f -> 3fff52d43
Do not repeat log messages in DataflowPipelineJob Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/401eef09 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/401eef09 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/401eef09 Branch: refs/heads/master Commit: 401eef09a0dc94dc2ed1e4afcaa30259d922ba3f Parents: 9f2733a Author: Kenneth Knowles <[email protected]> Authored: Sun Apr 30 21:24:23 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Apr 30 21:24:23 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/DataflowPipelineJob.java | 37 +++++- .../beam/runners/dataflow/DataflowRunner.java | 6 +- .../dataflow/util/DataflowTemplateJob.java | 2 +- .../runners/dataflow/util/MonitoringUtil.java | 4 +- .../dataflow/DataflowPipelineJobTest.java | 127 +++++++++++++++++-- .../testing/TestDataflowRunnerTest.java | 14 +- 6 files changed, 162 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index d464206..aef3155 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -100,6 +100,11 @@ public class DataflowPipelineJob implements PipelineResult { private List<MetricUpdate> terminalMetricUpdates; /** + * The latest timestamp up to which job messages have been retrieved. + */ + private long lastTimestamp = Long.MIN_VALUE; + + /** * The polling interval for job status and messages information. */ static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2); @@ -132,12 +137,13 @@ public class DataflowPipelineJob implements PipelineResult { * @param transformStepNames a mapping from AppliedPTransforms to Step Names */ public DataflowPipelineJob( + DataflowClient dataflowClient, String jobId, DataflowPipelineOptions dataflowOptions, Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) { + this.dataflowClient = dataflowClient; this.jobId = jobId; this.dataflowOptions = dataflowOptions; - this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions)); this.transformStepNames = HashBiMap.create( firstNonNull(transformStepNames, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of())); this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient); @@ -183,7 +189,8 @@ public class DataflowPipelineJob implements PipelineResult { @Nullable public State waitUntilFinish(Duration duration) { try { - return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler()); + return waitUntilFinish( + duration, new MonitoringUtil.LoggingHandler()); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -229,12 +236,29 @@ public class DataflowPipelineJob implements PipelineResult { try { Runtime.getRuntime().addShutdownHook(shutdownHook); - return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM); + return waitUntilFinish( + duration, + messageHandler, + Sleeper.DEFAULT, + NanoClock.SYSTEM, + new MonitoringUtil(dataflowClient)); } finally { Runtime.getRuntime().removeShutdownHook(shutdownHook); } } + @Nullable + @VisibleForTesting + State waitUntilFinish( + Duration duration, + @Nullable MonitoringUtil.JobMessagesHandler messageHandler, + Sleeper sleeper, + NanoClock nanoClock) + throws IOException, InterruptedException { + return waitUntilFinish( + duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient)); + } + /** * Waits until the pipeline finishes and returns the final status. * @@ -256,10 +280,9 @@ public class DataflowPipelineJob implements PipelineResult { Duration duration, @Nullable MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, - NanoClock nanoClock) throws IOException, InterruptedException { - MonitoringUtil monitor = new MonitoringUtil(dataflowClient); + NanoClock nanoClock, + MonitoringUtil monitor) throws IOException, InterruptedException { - long lastTimestamp = 0; BackOff backoff; if (!duration.isLongerThan(Duration.ZERO)) { backoff = MESSAGES_BACKOFF_FACTORY.backoff(); @@ -460,7 +483,7 @@ public class DataflowPipelineJob implements PipelineResult { if (currentState.isTerminal()) { terminalState = currentState; replacedByJob = new DataflowPipelineJob( - job.getReplacedByJobId(), dataflowOptions, transformStepNames); + dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames); } return job; } catch (IOException exn) { http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a61fe49..12d10de 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -621,7 +621,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Use a raw client for post-launch monitoring, as status calls may fail // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = - new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames()); + new DataflowPipelineJob( + DataflowClient.create(options), + jobResult.getId(), + options, + jobSpecification.getStepNames()); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java index 1a44963..2937184 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java @@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob { "The result of template creation should not be used."; public DataflowTemplateJob() { - super(null, null, null); + super(null, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index c410afb..759387c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; /** * A helper class for monitoring jobs submitted to the service. */ -public final class MonitoringUtil { +public class MonitoringUtil { private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud beta dataflow"; private static final String ENDPOINT_OVERRIDE_ENV_VAR = @@ -147,7 +147,7 @@ public final class MonitoringUtil { * timestamp greater than this value. * @return collection of messages */ - public ArrayList<JobMessage> getJobMessages( + public List<JobMessage> getJobMessages( String jobId, long startTimestampMs) throws IOException { // TODO: Allow filtering messages by importance Instant startTimestamp = new Instant(startTimestampMs); http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index e1235b9..237493a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -23,6 +23,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -35,14 +37,20 @@ import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages; import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; @@ -57,6 +65,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -77,6 +86,8 @@ public class DataflowPipelineJobTest { private static final String REPLACEMENT_JOB_ID = "4321"; @Mock + private DataflowClient mockDataflowClient; + @Mock private Dataflow mockWorkflowClient; @Mock private Dataflow.Projects mockProjects; @@ -84,6 +95,8 @@ public class DataflowPipelineJobTest { private Dataflow.Projects.Locations mockLocations; @Mock private Dataflow.Projects.Locations.Jobs mockJobs; + @Mock + private MonitoringUtil.JobMessagesHandler mockHandler; @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); @@ -157,7 +170,10 @@ public class DataflowPipelineJobTest { when(listRequest.execute()).thenThrow(SocketTimeoutException.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); State state = job.waitUntilFinish( @@ -179,7 +195,10 @@ public class DataflowPipelineJobTest { when(statusRequest.execute()).thenReturn(statusResponse); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock); @@ -246,7 +265,10 @@ public class DataflowPipelineJobTest { when(statusRequest.execute()).thenThrow(IOException.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); long startTime = fastClock.nanoTime(); @@ -266,7 +288,10 @@ public class DataflowPipelineJobTest { when(statusRequest.execute()).thenThrow(IOException.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); long startTime = fastClock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock); @@ -289,7 +314,10 @@ public class DataflowPipelineJobTest { FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper(); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); long startTime = clock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock); @@ -311,7 +339,10 @@ public class DataflowPipelineJobTest { when(statusRequest.execute()).thenReturn(statusResponse); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); assertEquals( @@ -328,7 +359,10 @@ public class DataflowPipelineJobTest { when(statusRequest.execute()).thenThrow(IOException.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, + new DataflowPipelineJob( + DataflowClient.create(options), + JOB_ID, + options, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); long startTime = fastClock.nanoTime(); @@ -379,7 +413,8 @@ public class DataflowPipelineJobTest { .thenReturn(update); when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED")); - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); assertEquals(State.CANCELLED, job.cancel()); Job content = new Job(); @@ -406,7 +441,8 @@ public class DataflowPipelineJobTest { .thenReturn(update); when(update.execute()).thenThrow(new IOException("Some random IOException")); - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); thrown.expect(IOException.class); thrown.expectMessage("Failed to cancel job in state RUNNING, " @@ -436,7 +472,8 @@ public class DataflowPipelineJobTest { .thenReturn(update); when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); State returned = job.cancel(); assertThat(returned, equalTo(State.RUNNING)); expectedLogs.verifyWarn("Cancel failed because job is already terminated."); @@ -458,7 +495,8 @@ public class DataflowPipelineJobTest { .thenReturn(update); when(update.execute()).thenThrow(new IOException()); - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); assertEquals(State.FAILED, job.cancel()); Job content = new Job(); @@ -469,4 +507,71 @@ public class DataflowPipelineJobTest { verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID); verifyNoMoreInteractions(mockJobs); } + + /** + * Tests that a {@link DataflowPipelineJob} does not duplicate messages. + */ + @Test + public void testWaitUntilFinishNoRepeatedLogs() throws Exception { + DataflowPipelineJob job = new DataflowPipelineJob(mockDataflowClient, JOB_ID, options, null); + Sleeper sleeper = new ZeroSleeper(); + NanoClock nanoClock = mock(NanoClock.class); + + Instant separatingTimestamp = new Instant(42L); + JobMessage theMessage = infoMessage(separatingTimestamp, "nothing"); + + MonitoringUtil mockMonitor = mock(MonitoringUtil.class); + when(mockMonitor.getJobMessages(anyString(), anyLong())) + .thenReturn(ImmutableList.of(theMessage)); + + // The Job just always reports "running" across all calls + Job fakeJob = new Job(); + fakeJob.setCurrentState("JOB_STATE_RUNNING"); + when(mockDataflowClient.getJob(anyString())).thenReturn(fakeJob); + + // After waitUntilFinish the DataflowPipelineJob should record the latest message timestamp + when(nanoClock.nanoTime()).thenReturn(0L).thenReturn(2000000000L); + job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor); + verify(mockHandler).process(ImmutableList.of(theMessage)); + + // Second waitUntilFinish should request jobs with `separatingTimestamp` so the monitor + // will only return new messages + when(nanoClock.nanoTime()).thenReturn(3000000000L).thenReturn(6000000000L); + job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor); + verify(mockMonitor).getJobMessages(anyString(), eq(separatingTimestamp.getMillis())); + } + + private static JobMessage infoMessage(Instant timestamp, String text) { + JobMessage message = new JobMessage(); + message.setTime(TimeUtil.toCloudTime(timestamp)); + message.setMessageText(text); + return message; + } + + private class FakeMonitor extends MonitoringUtil { + // Messages in timestamp order + private final NavigableMap<Long, JobMessage> timestampedMessages; + + public FakeMonitor(JobMessage... messages) { + // The client should never be used; this Fake is intended to intercept relevant methods + super(mockDataflowClient); + + NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap(); + for (JobMessage message : messages) { + timestampedMessages.put(Long.parseLong(message.getTime()), message); + } + + this.timestampedMessages = timestampedMessages; + } + + @Override + public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) { + return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values()); + } + } + + private static class ZeroSleeper implements Sleeper { + @Override + public void sleep(long l) throws InterruptedException {} + } } http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 80fbfe5..54eb88d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -307,7 +307,8 @@ public class TestDataflowRunnerTest { */ @Test public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = + spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -326,7 +327,8 @@ public class TestDataflowRunnerTest { */ @Test public void testCheckingForSuccessWhenPAssertFails() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = + spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -342,7 +344,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -363,7 +365,7 @@ public class TestDataflowRunnerTest { */ @Test public void testStreamingPipelineFailsIfServiceFails() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -422,7 +424,7 @@ public class TestDataflowRunnerTest { @Test public void testGetJobMetricsThatSucceeds() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -438,7 +440,7 @@ public class TestDataflowRunnerTest { @Test public void testGetJobMetricsThatFailsForException() throws Exception { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3));
