http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java deleted file mode 100644 index 38d4c96..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java +++ /dev/null @@ -1,1417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.startsWith; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsList; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMap; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMultimap; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsSingleton; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner.TransformedMap; -import org.apache.beam.runners.dataflow.internal.IsmFormat; -import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; -import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; -import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; -import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.AvroSource; -import org.apache.beam.sdk.io.BigQueryIO; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.runners.dataflow.TestCountingSource; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.beam.sdk.util.TestCredential; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import org.hamcrest.Description; -import org.hamcrest.Matchers; -import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Tests for the {@link DataflowPipelineRunner}. - */ -@RunWith(JUnit4.class) -public class DataflowPipelineRunnerTest { - - private static final String PROJECT_ID = "some-project"; - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public ExpectedException thrown = ExpectedException.none(); - - // Asserts that the given Job has all expected fields set. - private static void assertValidJob(Job job) { - assertNull(job.getId()); - assertNull(job.getCurrentState()); - assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName())); - } - - private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { - options.setStableUniqueNames(CheckEnabled.ERROR); - options.setRunner(DataflowPipelineRunner.class); - Pipeline p = Pipeline.create(options); - - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); - - return p; - } - - private static Dataflow buildMockDataflow( - final ArgumentCaptor<Job> jobCaptor) throws IOException { - Dataflow mockDataflowClient = mock(Dataflow.class); - Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); - Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class); - Dataflow.Projects.Jobs.Create mockRequest = - mock(Dataflow.Projects.Jobs.Create.class); - Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class); - - when(mockDataflowClient.projects()).thenReturn(mockProjects); - when(mockProjects.jobs()).thenReturn(mockJobs); - when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture())) - .thenReturn(mockRequest); - when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList); - when(mockList.setPageToken(anyString())).thenReturn(mockList); - when(mockList.execute()) - .thenReturn( - new ListJobsResponse() - .setJobs( - Arrays.asList( - new Job() - .setName("oldjobname") - .setId("oldJobId") - .setCurrentState("JOB_STATE_RUNNING")))); - - Job resultJob = new Job(); - resultJob.setId("newid"); - when(mockRequest.execute()).thenReturn(resultJob); - return mockDataflowClient; - } - - private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException { - GcsUtil mockGcsUtil = mock(GcsUtil.class); - when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); - when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { - @Override - public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists); - return mockGcsUtil; - } - - private DataflowPipelineOptions buildPipelineOptions() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - return buildPipelineOptions(jobCaptor); - } - - private DataflowPipelineOptions buildPipelineOptions( - ArgumentCaptor<Job> jobCaptor) throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject(PROJECT_ID); - options.setTempLocation("gs://somebucket/some/path"); - // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. - options.setFilesToStage(new LinkedList<String>()); - options.setDataflowClient(buildMockDataflow(jobCaptor)); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - options.setGcpCredential(new TestCredential()); - return options; - } - - @Test - public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception { - String mixedCase = "ThisJobNameHasMixedCase"; - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setJobName(mixedCase); - - DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions(options); - assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase())); - } - - @Test - public void testRun() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - Pipeline p = buildDataflowPipeline(options); - DataflowPipelineJob job = (DataflowPipelineJob) p.run(); - assertEquals("newid", job.getJobId()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testRunReturnDifferentRequestId() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - Dataflow mockDataflowClient = options.getDataflowClient(); - Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); - when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class))) - .thenReturn(mockRequest); - Job resultJob = new Job(); - resultJob.setId("newid"); - // Return a different request id. - resultJob.setClientRequestId("different_request_id"); - when(mockRequest.execute()).thenReturn(resultJob); - - Pipeline p = buildDataflowPipeline(options); - try { - p.run(); - fail("Expected DataflowJobAlreadyExistsException"); - } catch (DataflowJobAlreadyExistsException expected) { - assertThat(expected.getMessage(), - containsString("If you want to submit a second job, try again by setting a " - + "different name using --jobName.")); - assertEquals(expected.getJob().getJobId(), resultJob.getId()); - } - } - - @Test - public void testUpdate() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setUpdate(true); - options.setJobName("oldJobName"); - Pipeline p = buildDataflowPipeline(options); - DataflowPipelineJob job = (DataflowPipelineJob) p.run(); - assertEquals("newid", job.getJobId()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testUpdateNonExistentPipeline() throws IOException { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Could not find running job named badjobname"); - - DataflowPipelineOptions options = buildPipelineOptions(); - options.setUpdate(true); - options.setJobName("badJobName"); - Pipeline p = buildDataflowPipeline(options); - p.run(); - } - - @Test - public void testUpdateAlreadyUpdatedPipeline() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setUpdate(true); - options.setJobName("oldJobName"); - Dataflow mockDataflowClient = options.getDataflowClient(); - Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); - when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class))) - .thenReturn(mockRequest); - final Job resultJob = new Job(); - resultJob.setId("newid"); - // Return a different request id. - resultJob.setClientRequestId("different_request_id"); - when(mockRequest.execute()).thenReturn(resultJob); - - Pipeline p = buildDataflowPipeline(options); - - thrown.expect(DataflowJobAlreadyUpdatedException.class); - thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() { - @Override - public void describeTo(Description description) { - description.appendText("Expected job ID: " + resultJob.getId()); - } - - @Override - protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) { - return resultJob.getId().equals(item.getJob().getJobId()); - } - }); - thrown.expectMessage("The job named oldjobname with id: oldJobId has already been updated " - + "into job id: newid and cannot be updated again."); - p.run(); - } - - @Test - public void testRunWithFiles() throws IOException { - // Test that the function DataflowPipelineRunner.stageFiles works as - // expected. - GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */); - final String gcsStaging = "gs://somebucket/some/path"; - final String gcsTemp = "gs://somebucket/some/temp/path"; - final String cloudDataflowDataset = "somedataset"; - - // Create some temporary files. - File temp1 = File.createTempFile("DataflowPipelineRunnerTest", "txt"); - temp1.deleteOnExit(); - File temp2 = File.createTempFile("DataflowPipelineRunnerTest2", "txt"); - temp2.deleteOnExit(); - - String overridePackageName = "alias.txt"; - - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setFilesToStage(ImmutableList.of( - temp1.getAbsolutePath(), - overridePackageName + "=" + temp2.getAbsolutePath())); - options.setStagingLocation(gcsStaging); - options.setTempLocation(gcsTemp); - options.setTempDatasetId(cloudDataflowDataset); - options.setProject(PROJECT_ID); - options.setJobName("job"); - options.setDataflowClient(buildMockDataflow(jobCaptor)); - options.setGcsUtil(mockGcsUtil); - options.setGcpCredential(new TestCredential()); - - Pipeline p = buildDataflowPipeline(options); - - DataflowPipelineJob job = (DataflowPipelineJob) p.run(); - assertEquals("newid", job.getJobId()); - - Job workflowJob = jobCaptor.getValue(); - assertValidJob(workflowJob); - - assertEquals( - 2, - workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().size()); - DataflowPackage workflowPackage1 = - workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(0); - assertThat(workflowPackage1.getName(), startsWith(temp1.getName())); - DataflowPackage workflowPackage2 = - workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(1); - assertEquals(overridePackageName, workflowPackage2.getName()); - - assertEquals( - "storage.googleapis.com/somebucket/some/temp/path", - workflowJob.getEnvironment().getTempStoragePrefix()); - assertEquals( - cloudDataflowDataset, - workflowJob.getEnvironment().getDataset()); - assertEquals( - ReleaseInfo.getReleaseInfo().getName(), - workflowJob.getEnvironment().getUserAgent().get("name")); - assertEquals( - ReleaseInfo.getReleaseInfo().getVersion(), - workflowJob.getEnvironment().getUserAgent().get("version")); - } - - @Test - public void runWithDefaultFilesToStage() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setFilesToStage(null); - DataflowPipelineRunner.fromOptions(options); - assertTrue(!options.getFilesToStage().isEmpty()); - } - - @Test - public void detectClassPathResourceWithFileResources() throws Exception { - File file = tmpFolder.newFile("file"); - File file2 = tmpFolder.newFile("file2"); - URLClassLoader classLoader = new URLClassLoader(new URL[]{ - file.toURI().toURL(), - file2.toURI().toURL() - }); - - assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()), - DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader)); - } - - @Test - public void detectClassPathResourcesWithUnsupportedClassLoader() { - ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unable to use ClassLoader to detect classpath elements."); - - DataflowPipelineRunner.detectClassPathResourcesToStage(mockClassLoader); - } - - @Test - public void detectClassPathResourceWithNonFileResources() throws Exception { - String url = "http://www.google.com/all-the-secrets.jar"; - URLClassLoader classLoader = new URLClassLoader(new URL[]{ - new URL(url) - }); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unable to convert url (" + url + ") to file."); - - DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader); - } - - @Test - public void testGcsStagingLocationInitialization() throws Exception { - // Test that the staging location is initialized correctly. - String gcsTemp = "gs://somebucket/some/temp/path"; - - // Set temp location (required), and check that staging location is set. - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setTempLocation(gcsTemp); - options.setProject(PROJECT_ID); - options.setGcpCredential(new TestCredential()); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - options.setRunner(DataflowPipelineRunner.class); - - DataflowPipelineRunner.fromOptions(options); - - assertNotNull(options.getStagingLocation()); - } - - @Test - public void testNonGcsFilePathInReadFailure() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath())); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage( - containsString("expected a valid 'gs://' path but was given")))); - p.run(); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testNonGcsFilePathInWriteFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); - } - - @Test - public void testMultiSlashGcsFileReadPath() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadInvalidGcsFile") - .from("gs://bucket/tmp//file")); - - thrown.expectCause(Matchers.allOf( - instanceOf(IllegalArgumentException.class), - ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); - p.run(); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testMultiSlashGcsFileWritePath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("consecutive slashes"); - pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); - } - - @Test - public void testInvalidTempLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setTempLocation("file://temp/location"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - DataflowPipelineRunner.fromOptions(options); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testInvalidStagingLocation() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setStagingLocation("file://my/staging/location"); - try { - DataflowPipelineRunner.fromOptions(options); - fail("fromOptions should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); - } - options.setStagingLocation("my/staging/location"); - try { - DataflowPipelineRunner.fromOptions(options); - fail("fromOptions should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); - } - } - - @Test - public void testNonExistentTempLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcsUtil(mockGcsUtil); - options.setTempLocation("gs://non-existent-bucket/location"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString( - "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); - DataflowPipelineRunner.fromOptions(options); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testNonExistentStagingLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcsUtil(mockGcsUtil); - options.setStagingLocation("gs://non-existent-bucket/location"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString( - "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); - DataflowPipelineRunner.fromOptions(options); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testNoProjectFails() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - - options.setRunner(DataflowPipelineRunner.class); - // Explicitly set to null to prevent the default instance factory from reading credentials - // from a user's environment, causing this test to fail. - options.setProject(null); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Project id"); - thrown.expectMessage("when running a Dataflow in the cloud"); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testProjectId() throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("foo-12345"); - - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - options.setGcpCredential(new TestCredential()); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testProjectPrefix() throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("google.com:some-project-12345"); - - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - options.setGcpCredential(new TestCredential()); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testProjectNumber() throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("12345"); - - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Project ID"); - thrown.expectMessage("project number"); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testProjectDescription() throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("some project"); - - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Project ID"); - thrown.expectMessage("project description"); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("foo-12345"); - - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - - options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Number of worker harness threads"); - thrown.expectMessage("Please make sure the value is non-negative."); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testNoStagingLocationAndNoTempLocationFails() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("foo-project"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Missing required value: at least one of tempLocation or stagingLocation must be set."); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testStagingLocationAndNoTempLocationSucceeds() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setGcpCredential(new TestCredential()); - options.setProject("foo-project"); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testTempLocationAndNoStagingLocationSucceeds() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setGcpCredential(new TestCredential()); - options.setProject("foo-project"); - options.setTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - - DataflowPipelineRunner.fromOptions(options); - } - - @Test - public void testInvalidJobName() throws IOException { - List<String> invalidNames = Arrays.asList( - "invalid_name", - "0invalid", - "invalid-"); - List<String> expectedReason = Arrays.asList( - "JobName invalid", - "JobName invalid", - "JobName invalid"); - - for (int i = 0; i < invalidNames.size(); ++i) { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setJobName(invalidNames.get(i)); - - try { - DataflowPipelineRunner.fromOptions(options); - fail("Expected IllegalArgumentException for jobName " - + options.getJobName()); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), - containsString(expectedReason.get(i))); - } - } - } - - @Test - public void testValidJobName() throws IOException { - List<String> names = Arrays.asList("ok", "Ok", "A-Ok", "ok-123", - "this-one-is-fairly-long-01234567890123456789"); - - for (String name : names) { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setJobName(name); - - DataflowPipelineRunner runner = DataflowPipelineRunner - .fromOptions(options); - assertNotNull(runner); - } - } - - /** - * A fake PTransform for testing. - */ - public static class TestTransform - extends PTransform<PCollection<Integer>, PCollection<Integer>> { - public boolean translated = false; - - @Override - public PCollection<Integer> apply(PCollection<Integer> input) { - return PCollection.<Integer>createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - input.isBounded()); - } - - @Override - protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) { - return input.getCoder(); - } - } - - @Test - public void testTransformTranslatorMissing() throws IOException { - // Test that we throw if we don't provide a translation. - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - Pipeline p = Pipeline.create(options); - - p.apply(Create.of(Arrays.asList(1, 2, 3))) - .apply(new TestTransform()); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage(Matchers.containsString("no translator registered")); - DataflowPipelineTranslator.fromOptions(options) - .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); - assertValidJob(jobCaptor.getValue()); - } - - @Test - public void testTransformTranslator() throws IOException { - // Test that we can provide a custom translation - DataflowPipelineOptions options = buildPipelineOptions(); - Pipeline p = Pipeline.create(options); - TestTransform transform = new TestTransform(); - - p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of())) - .apply(transform); - - DataflowPipelineTranslator translator = DataflowPipelineRunner - .fromOptions(options).getTranslator(); - - DataflowPipelineTranslator.registerTransformTranslator( - TestTransform.class, - new DataflowPipelineTranslator.TransformTranslator<TestTransform>() { - @SuppressWarnings("unchecked") - @Override - public void translate( - TestTransform transform, - DataflowPipelineTranslator.TranslationContext context) { - transform.translated = true; - - // Note: This is about the minimum needed to fake out a - // translation. This obviously isn't a real translation. - context.addStep(transform, "TestTranslate"); - context.addOutput("output", context.getOutput(transform)); - } - }); - - translator.translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); - assertTrue(transform.translated); - } - - /** Records all the composite transforms visited within the Pipeline. */ - private static class CompositeTransformRecorder extends PipelineVisitor.Defaults { - private List<PTransform<?, ?>> transforms = new ArrayList<>(); - - @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { - if (node.getTransform() != null) { - transforms.add(node.getTransform()); - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - public List<PTransform<?, ?>> getCompositeTransforms() { - return transforms; - } - } - - @Test - public void testApplyIsScopedToExactClass() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - Pipeline p = Pipeline.create(options); - - Create.TimestampedValues<String> transform = - Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now()))); - p.apply(transform); - - CompositeTransformRecorder recorder = new CompositeTransformRecorder(); - p.traverseTopologically(recorder); - - // The recorder will also have seen a Create.Values composite as well, but we can't obtain that - // transform. - assertThat( - "Expected to have seen CreateTimestamped composite transform.", - recorder.getCompositeTransforms(), - hasItem(transform)); - assertThat( - "Expected to have two composites, CreateTimestamped and Create.Values", - recorder.getCompositeTransforms(), - hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class))); - } - - @Test - public void testToString() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setJobName("TestJobName"); - options.setProject("test-project"); - options.setTempLocation("gs://test/temp/location"); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); - options.setRunner(DataflowPipelineRunner.class); - assertEquals( - "DataflowPipelineRunner#testjobname", - DataflowPipelineRunner.fromOptions(options).toString()); - } - - private static PipelineOptions makeOptions(boolean streaming) { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setStreaming(streaming); - options.setJobName("TestJobName"); - options.setProject("test-project"); - options.setTempLocation("gs://test/temp/location"); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); - return options; - } - - private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming) - throws Exception { - String mode = streaming ? "streaming" : "batch"; - thrown.expect(UnsupportedOperationException.class); - thrown.expectMessage( - "The DataflowPipelineRunner in " + mode + " mode does not support " + name); - - Pipeline p = Pipeline.create(makeOptions(streaming)); - p.apply(source); - p.run(); - } - - @Test - public void testBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); - } - - @Test - public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); - } - - @Test - public void testAvroIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroIO.Read.from("foo"), "AvroIO.Read", true); - } - - @Test - public void testTextIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); - } - - @Test - public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); - } - - @Test - public void testReadUnboundedUnsupportedInBatch() throws Exception { - testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); - } - - private void testUnsupportedSink( - PTransform<PCollection<String>, PDone> sink, String name, boolean streaming) - throws Exception { - thrown.expect(UnsupportedOperationException.class); - thrown.expectMessage( - "The DataflowPipelineRunner in streaming mode does not support " + name); - - Pipeline p = Pipeline.create(makeOptions(streaming)); - p.apply(Create.of("foo")).apply(sink); - p.run(); - } - - @Test - public void testAvroIOSinkUnsupportedInStreaming() throws Exception { - testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true); - } - - @Test - public void testTextIOSinkUnsupportedInStreaming() throws Exception { - testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true); - } - - @Test - public void testBatchViewAsSingletonToIsmRecord() throws Exception { - DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, - IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of( - new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn - <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); - - assertThat( - doFnTester.processBundle( - ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( - 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))), - contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a")))); - } - - @Test - public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException() - throws Exception { - DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, - IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of( - new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn - <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("found for singleton within window"); - doFnTester.processBundle(ImmutableList.of( - KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0, - ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")), - KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b")))))); - } - - @Test - public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { - DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>()); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains( - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")), - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")), - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c")))); - } - - @Test - public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception { - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of( - new BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<Long, IntervalWindow>( - IntervalWindow.getCoder())); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of( - KV.of( - windowA, WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowA, WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowA, WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowB, WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - KV.of( - windowB, WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)) - )), - KV.of(2, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of( - KV.of( - windowC, WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)) - ))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.of(ImmutableList.of(windowA, 0L), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowA, 1L), - WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowA, 2L), - WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowB, 0L), - WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowB, 1L), - WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowC, 0L), - WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))); - } - - @Test - public void testToIsmRecordForMapLikeDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>( - outputForSizeTag, - outputForEntrySetTag, - windowCoder, - keyCoder, - ismCoder, - false /* unique keys */)); - doFnTester.setSideOutputTags(TupleTagList.of( - ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag))); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - KV.of(KV.of(1L, windowA), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - // same window same key as to previous - KV.of(KV.of(1L, windowA), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), - // same window different key as to previous - KV.of(KV.of(2L, windowA), - WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - // different window same key as to previous - KV.of(KV.of(2L, windowB), - WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), - // different window and different key as to previous - KV.of(KV.of(3L, windowB), - WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - // different shard - KV.of(KV.of(4L, windowC), - WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.of( - ImmutableList.of(1L, windowA, 0L), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(1L, windowA, 1L), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(2L, windowA, 0L), - WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(2L, windowB, 0L), - WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(3L, windowB, 0L), - WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(4L, windowC, 0L), - WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))); - - // Verify the number of unique keys per window. - assertThat(doFnTester.takeSideOutputElements(outputForSizeTag), contains( - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), - KV.of(windowC, 1L)) - )); - - // Verify the output for the unique keys. - assertThat(doFnTester.takeSideOutputElements(outputForEntrySetTag), contains( - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 1L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 3L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), - KV.of(windowC, 4L)) - )); - } - - @Test - public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>( - outputForSizeTag, - outputForEntrySetTag, - windowCoder, - keyCoder, - ismCoder, - true /* unique keys */)); - doFnTester.setSideOutputTags(TupleTagList.of( - ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag))); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - - Iterable<KV<Integer, - Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - KV.of(KV.of(1L, windowA), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - // same window same key as to previous - KV.of(KV.of(1L, windowA), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING))))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Unique keys are expected but found key"); - doFnTester.processBundle(inputElements); - } - - @Test - public void testToIsmMetadataRecordForSizeDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of( - new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<Long, Long, IntervalWindow>( - windowCoder)); - doFnTester.setSideOutputTags(TupleTagList.of( - ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag))); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowA, 2L), - KV.of(windowA, 3L), - KV.of(windowB, 7L))), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowC, 9L)))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L)) - )); - } - - @Test - public void testToIsmMetadataRecordForKeyDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of( - new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<Long, Long, IntervalWindow>( - keyCoder, windowCoder)); - doFnTester.setSideOutputTags(TupleTagList.of( - ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag))); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowA, 2L), - // same window as previous - KV.of(windowA, 3L), - // different window as previous - KV.of(windowB, 3L))), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowC, 3L)))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)) - )); - } - - @Test - public void testToMapDoFn() throws Exception { - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, - IsmRecord<WindowedValue<TransformedMap<Long, - WindowedValue<Long>, - Long>>>> doFnTester = - DoFnTester.of(new BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(windowCoder)); - - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowA, WindowedValue.of( - KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowC, WindowedValue.of( - KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - List<IsmRecord<WindowedValue<TransformedMap<Long, - WindowedValue<Long>, - Long>>>> output = - doFnTester.processBundle(inputElements); - assertEquals(3, output.size()); - Map<Long, Long> outputMap; - - outputMap = output.get(0).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap); - - outputMap = output.get(1).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap); - - outputMap = output.get(2).getValue().getValue(); - assertEquals(1, outputMap.size()); - assertEquals(ImmutableMap.of(4L, 41L), outputMap); - } - - @Test - public void testToMultimapDoFn() throws Exception { - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, - IsmRecord<WindowedValue<TransformedMap<Long, - Iterable<WindowedValue<Long>>, - Iterable<Long>>>>> doFnTester = - DoFnTester.of( - new BatchViewAsMultimap.ToMultimapDoFn<Long, Long, IntervalWindow>(windowCoder)); - - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowA, WindowedValue.of( - KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowC, WindowedValue.of( - KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - List<IsmRecord<WindowedValue<TransformedMap<Long, - Iterable<WindowedValue<Long>>, - Iterable<Long>>>>> output = - doFnTester.processBundle(inputElements); - assertEquals(3, output.size()); - Map<Long, Iterable<Long>> outputMap; - - outputMap = output.get(0).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L)); - assertThat(outputMap.get(2L), containsInAnyOrder(21L)); - - outputMap = output.get(1).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertThat(outputMap.get(2L), containsInAnyOrder(21L)); - assertThat(outputMap.get(3L), containsInAnyOrder(31L)); - - outputMap = output.get(2).getValue().getValue(); - assertEquals(1, outputMap.size()); - assertThat(outputMap.get(4L), containsInAnyOrder(41L)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 165d2b5..261ba99 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -124,7 +124,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { } private Pipeline buildPipeline(DataflowPipelineOptions options) { - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) @@ -164,7 +164,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); @@ -178,14 +178,14 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testSettingOfSdkPipelineOptions() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); // Note that the contents of this materialized map may be changed by the act of reading an @@ -196,7 +196,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { settings.put("project", "some-project"); settings.put("pathValidatorClass", "org.apache.beam.runners.dataflow.util.DataflowPathValidator"); - settings.put("runner", "org.apache.beam.runners.dataflow.DataflowPipelineRunner"); + settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner"); settings.put("jobName", "some-job-name"); settings.put("tempLocation", "gs://somebucket/some/path"); settings.put("stagingLocation", "gs://somebucket/some/path/staging"); @@ -222,7 +222,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -239,7 +239,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -258,7 +258,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -275,7 +275,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -291,7 +291,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -327,7 +327,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -362,7 +362,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -396,7 +396,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -416,7 +416,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -437,7 +437,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Job job = DataflowPipelineTranslator.fromOptions(options) .translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -466,7 +466,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -520,7 +520,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -655,7 +655,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Check that translation doesn't fail. t.translate( - p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); + p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); } @Test @@ -698,7 +698,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Check that translation doesn't fail. t.translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()); } @@ -724,7 +724,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); t.translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()); } @@ -745,7 +745,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -777,7 +777,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -807,7 +807,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -840,7 +840,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); @@ -903,7 +903,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { translator .translate( pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), + (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob();
