[Code Health] Remove redundant projectId from DataflowPipelineJob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ded58832 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ded58832 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ded58832 Branch: refs/heads/master Commit: ded58832ceaef487f4590d9396f09744288c955d Parents: afedd68 Author: Pei He <pe...@google.com> Authored: Wed Nov 23 16:14:27 2016 -0800 Committer: bchambers <bchamb...@google.com> Committed: Tue Dec 6 17:08:12 2016 -0800 ---------------------------------------------------------------------- .../runners/dataflow/DataflowPipelineJob.java | 22 +++------ .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../dataflow/util/DataflowTemplateJob.java | 2 +- .../dataflow/DataflowPipelineJobTest.java | 48 ++++++++++---------- .../testing/TestDataflowRunnerTest.java | 36 +++++---------- 5 files changed, 45 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index a2b632f..58e85e0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -62,11 +62,6 @@ public class DataflowPipelineJob implements PipelineResult { private String jobId; /** - * Google cloud project to associate this pipeline with. - */ - private String projectId; - - /** * Client for the Dataflow service. This can be used to query the service * for information about the job. */ @@ -119,17 +114,14 @@ public class DataflowPipelineJob implements PipelineResult { /** * Constructs the job. * - * @param projectId the project id * @param jobId the job id * @param dataflowOptions used to configure the client for the Dataflow Service * @param aggregatorTransforms a mapping from aggregators to PTransforms */ public DataflowPipelineJob( - String projectId, String jobId, DataflowPipelineOptions dataflowOptions, DataflowAggregatorTransforms aggregatorTransforms) { - this.projectId = projectId; this.jobId = jobId; this.dataflowOptions = dataflowOptions; this.aggregatorTransforms = aggregatorTransforms; @@ -146,7 +138,7 @@ public class DataflowPipelineJob implements PipelineResult { * Get the project this job exists in. */ public String getProjectId() { - return projectId; + return dataflowOptions.getProject(); } /** @@ -249,7 +241,7 @@ public class DataflowPipelineJob implements PipelineResult { MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException { - MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient()); + MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient()); long lastTimestamp = 0; BackOff backoff; @@ -338,12 +330,12 @@ public class DataflowPipelineJob implements PipelineResult { @Override public State cancel() throws IOException { Job content = new Job(); - content.setProjectId(projectId); + content.setProjectId(getProjectId()); content.setId(jobId); content.setRequestedState("JOB_STATE_CANCELLED"); try { dataflowOptions.getDataflowClient().projects().jobs() - .update(projectId, jobId, content) + .update(getProjectId(), jobId, content) .execute(); return State.CANCELLED; } catch (IOException e) { @@ -412,13 +404,13 @@ public class DataflowPipelineJob implements PipelineResult { Job job = dataflowOptions.getDataflowClient() .projects() .jobs() - .get(projectId, jobId) + .get(getProjectId(), jobId) .execute(); State currentState = MonitoringUtil.toState(job.getCurrentState()); if (currentState.isTerminal()) { terminalState = currentState; replacedByJob = new DataflowPipelineJob( - getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms); + job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms); } return job; } catch (IOException exn) { @@ -485,7 +477,7 @@ public class DataflowPipelineJob implements PipelineResult { } else { boolean terminal = getState().isTerminal(); JobMetrics jobMetrics = dataflowOptions.getDataflowClient() - .projects().jobs().getMetrics(projectId, jobId).execute(); + .projects().jobs().getMetrics(getProjectId(), jobId).execute(); metricUpdates = jobMetrics.getMetrics(); if (terminal && jobMetrics.getMetrics() != null) { terminalMetricUpdates = metricUpdates; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 339771b..e781b4e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -629,8 +629,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Use a raw client for post-launch monitoring, as status calls may fail // regularly and need not be retried automatically. - DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob( - options.getProject(), jobResult.getId(), options, aggregatorTransforms); + DataflowPipelineJob dataflowPipelineJob = + new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java index 2937184..1a44963 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java @@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob { "The result of template creation should not be used."; public DataflowTemplateJob() { - super(null, null, null, null); + super(null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 0527b7c..323f762 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -115,6 +115,7 @@ public class DataflowPipelineJobTest { options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); options.setDataflowClient(mockWorkflowClient); + options.setProject(PROJECT_ID); } /** @@ -160,8 +161,8 @@ public class DataflowPipelineJobTest { DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); - DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); State state = job.waitUntilFinish( Duration.standardMinutes(5), jobHandler, fastClock, fastClock); @@ -182,8 +183,8 @@ public class DataflowPipelineJobTest { DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); - DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock); } @@ -249,8 +250,8 @@ public class DataflowPipelineJobTest { DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); - DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); long startTime = fastClock.nanoTime(); State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock); @@ -269,8 +270,8 @@ public class DataflowPipelineJobTest { DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); - DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); long startTime = fastClock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock); assertEquals(null, state); @@ -294,7 +295,7 @@ public class DataflowPipelineJobTest { FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper(); DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + JOB_ID, options, dataflowAggregatorTransforms); long startTime = clock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock); assertEquals(null, state); @@ -317,7 +318,7 @@ public class DataflowPipelineJobTest { mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + JOB_ID, options, dataflowAggregatorTransforms); assertEquals( State.RUNNING, @@ -333,8 +334,8 @@ public class DataflowPipelineJobTest { DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); - DataflowPipelineJob job = new DataflowPipelineJob( - PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); long startTime = fastClock.nanoTime(); assertEquals( @@ -373,7 +374,7 @@ public class DataflowPipelineJobTest { modelJob.setCurrentState(State.RUNNING.toString()); DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); AggregatorValues<?> values = job.getAggregatorValues(aggregator); @@ -408,7 +409,7 @@ public class DataflowPipelineJobTest { modelJob.setCurrentState(State.RUNNING.toString()); DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); AggregatorValues<?> values = job.getAggregatorValues(aggregator); @@ -453,8 +454,7 @@ public class DataflowPipelineJobTest { when(getState.execute()).thenReturn(modelJob); modelJob.setCurrentState(State.RUNNING.toString()); - DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); AggregatorValues<Long> values = job.getAggregatorValues(aggregator); @@ -521,8 +521,7 @@ public class DataflowPipelineJobTest { when(getState.execute()).thenReturn(modelJob); modelJob.setCurrentState(State.RUNNING.toString()); - DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); AggregatorValues<Long> values = job.getAggregatorValues(aggregator); @@ -571,7 +570,7 @@ public class DataflowPipelineJobTest { modelJob.setCurrentState(State.RUNNING.toString()); DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); AggregatorValues<Long> values = job.getAggregatorValues(aggregator); @@ -589,7 +588,7 @@ public class DataflowPipelineJobTest { ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("not used in this pipeline"); @@ -624,8 +623,7 @@ public class DataflowPipelineJobTest { when(getState.execute()).thenReturn(modelJob); modelJob.setCurrentState(State.RUNNING.toString()); - DataflowPipelineJob job = - new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); thrown.expect(AggregatorRetrievalException.class); thrown.expectCause(is(cause)); @@ -690,7 +688,7 @@ public class DataflowPipelineJobTest { when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); when(update.execute()).thenReturn(new Job()); - DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); assertEquals(State.CANCELLED, job.cancel()); Job content = new Job(); @@ -714,7 +712,7 @@ public class DataflowPipelineJobTest { when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); when(update.execute()).thenThrow(new IOException()); - DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); thrown.expect(IOException.class); thrown.expectMessage("Failed to cancel the job, " @@ -742,7 +740,7 @@ public class DataflowPipelineJobTest { when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); when(update.execute()).thenThrow(new IOException()); - DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); assertEquals(State.FAILED, job.cancel()); Job content = new Job(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index e6b513a..366c6a1 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -344,8 +344,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -359,8 +358,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckingForSuccessWhenPAssertFails() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -374,8 +372,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -389,8 +386,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -403,8 +399,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -417,8 +412,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -431,8 +425,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -446,8 +439,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -461,8 +453,7 @@ public class TestDataflowRunnerTest { @Test public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -476,8 +467,7 @@ public class TestDataflowRunnerTest { @Test public void testStreamingPipelineFailsIfServiceFails() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -532,8 +522,7 @@ public class TestDataflowRunnerTest { @Test public void testGetJobMetricsThatSucceeds() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -549,8 +538,7 @@ public class TestDataflowRunnerTest { @Test public void testGetJobMetricsThatFailsForException() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3));