http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java deleted file mode 100644 index d2c08c8..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java +++ /dev/null @@ -1,483 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.json.GenericJson; -import com.google.api.client.json.Json; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.HttpTesting; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.cloud.dataflow.sdk.options.GcsOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; -import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; -import com.google.cloud.dataflow.sdk.util.PackageUtil.PackageAttributes; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.io.Files; -import com.google.common.io.LineReader; - -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.Pipe; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.regex.Pattern; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -/** Tests for PackageUtil. */ -@RunWith(JUnit4.class) -public class PackageUtilTest { - @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class); - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule - public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); - - @Mock - GcsUtil mockGcsUtil; - - // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes - private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}"; - - // Hamcrest matcher to assert a string matches a pattern - private static class RegexMatcher extends BaseMatcher<String> { - private final Pattern pattern; - - public RegexMatcher(String regex) { - this.pattern = Pattern.compile(regex); - } - - @Override - public boolean matches(Object o) { - if (!(o instanceof String)) { - return false; - } - return pattern.matcher((String) o).matches(); - } - - @Override - public void describeTo(Description description) { - description.appendText(String.format("matches regular expression %s", pattern)); - } - - public static RegexMatcher matches(String regex) { - return new RegexMatcher(regex); - } - } - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - pipelineOptions.setGcsUtil(mockGcsUtil); - - IOChannelUtils.registerStandardIOFactories(pipelineOptions); - } - - private File makeFileWithContents(String name, String contents) throws Exception { - File tmpFile = tmpFolder.newFile(name); - Files.write(contents, tmpFile, StandardCharsets.UTF_8); - tmpFile.setLastModified(0); // required for determinism with directories - return tmpFile; - } - - static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString(); - private static PackageAttributes makePackageAttributes(File file, String overridePackageName) { - return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName); - } - - @Test - public void testFileWithExtensionPackageNamingAndSize() throws Exception { - String contents = "This is a test!"; - File tmpFile = makeFileWithContents("file.txt", contents); - PackageAttributes attr = makePackageAttributes(tmpFile, null); - DataflowPackage target = attr.getDataflowPackage(); - - assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); - assertThat(attr.getSize(), equalTo((long) contents.length())); - } - - @Test - public void testPackageNamingWithFileNoExtension() throws Exception { - File tmpFile = makeFileWithContents("file", "This is a test!"); - DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage(); - - assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN)); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); - } - - @Test - public void testPackageNamingWithDirectory() throws Exception { - File tmpDirectory = tmpFolder.newFolder("folder"); - DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage(); - - assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); - } - - @Test - public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception { - File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); - makeFileWithContents("folder1/folderA/sameName", "This is a test!"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); - - File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); - makeFileWithContents("folder2/folderA/sameName", "This is a test!"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); - - assertEquals(target1.getName(), target2.getName()); - assertEquals(target1.getLocation(), target2.getLocation()); - } - - @Test - public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception { - File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); - makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); - - File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); - makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); - - assertNotEquals(target1.getName(), target2.getName()); - assertNotEquals(target1.getLocation(), target2.getLocation()); - } - - @Test - public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames() - throws Exception { - File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); - tmpFolder.newFolder("folder1", "folderA", "uniqueName1"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); - - File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); - tmpFolder.newFolder("folder2", "folderA", "uniqueName2"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); - - assertNotEquals(target1.getName(), target2.getName()); - assertNotEquals(target1.getLocation(), target2.getLocation()); - } - - @Test - public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception { - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - // all files will be present and cached so no upload needed. - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); - - List<String> classpathElements = Lists.newLinkedList(); - for (int i = 0; i < 1005; ++i) { - String eltName = "element" + i; - classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); - } - - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH); - - logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); - } - - @Test - public void testPackageUploadWithFileSucceeds() throws Exception { - Pipe pipe = Pipe.open(); - String contents = "This is a test!"; - File tmpFile = makeFileWithContents("file.txt", contents); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - - List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); - DataflowPackage target = Iterables.getOnlyElement(targets); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - - assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); - assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(), - equalTo(contents)); - } - - @Test - public void testPackageUploadWithDirectorySucceeds() throws Exception { - Pipe pipe = Pipe.open(); - File tmpDirectory = tmpFolder.newFolder("folder"); - tmpFolder.newFolder("folder", "empty_directory"); - tmpFolder.newFolder("folder", "directory"); - makeFileWithContents("folder/file.txt", "This is a test!"); - makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - - ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source())); - List<String> zipEntryNames = new ArrayList<>(); - for (ZipEntry entry = inputStream.getNextEntry(); entry != null; - entry = inputStream.getNextEntry()) { - zipEntryNames.add(entry.getName()); - } - - assertThat(zipEntryNames, - containsInAnyOrder("directory/file.txt", "empty_directory/", "file.txt")); - } - - @Test - public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { - Pipe pipe = Pipe.open(); - File tmpDirectory = tmpFolder.newFolder("folder"); - - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - - List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); - DataflowPackage target = Iterables.getOnlyElement(targets); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - - assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); - assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry()); - } - - @Test(expected = RuntimeException.class) - public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .thenThrow(new IOException("Fake Exception: Upload error")); - - try { - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); - } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - } - } - - @Test - public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception { - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH, - googleJsonResponseException( - HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message"))); - - try { - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); - fail("Expected RuntimeException"); - } catch (RuntimeException e) { - assertTrue("Expected IOException containing detailed message.", - e.getCause() instanceof IOException); - assertThat(e.getCause().getMessage(), - Matchers.allOf( - Matchers.containsString("Uploaded failed due to permissions error"), - Matchers.containsString( - "Stale credentials can be resolved by executing 'gcloud auth login'"))); - } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - } - } - - @Test - public void testPackageUploadEventuallySucceeds() throws Exception { - Pipe pipe = Pipe.open(); - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails - .thenReturn(pipe.sink()); // second attempt succeeds - - try { - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, - fastNanoClockAndSleeper); - } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - } - } - - @Test - public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); - - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verifyNoMoreInteractions(mockGcsUtil); - } - - @Test - public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception { - Pipe pipe = Pipe.open(); - File tmpDirectory = tmpFolder.newFolder("folder"); - tmpFolder.newFolder("folder", "empty_directory"); - tmpFolder.newFolder("folder", "directory"); - makeFileWithContents("folder/file.txt", "This is a test!"); - makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE); - when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - } - - @Test - public void testPackageUploadWithExplicitPackageName() throws Exception { - Pipe pipe = Pipe.open(); - File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - final String overriddenName = "alias.txt"; - - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); - when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - - List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH); - DataflowPackage target = Iterables.getOnlyElement(targets); - - verify(mockGcsUtil).fileSize(any(GcsPath.class)); - verify(mockGcsUtil).create(any(GcsPath.class), anyString()); - verifyNoMoreInteractions(mockGcsUtil); - - assertThat(target.getName(), equalTo(overriddenName)); - assertThat(target.getLocation(), - RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt")); - } - - @Test - public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception { - String nonExistentFile = - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file"); - assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH)); - } - - /** - * Builds a fake GoogleJsonResponseException for testing API error handling. - */ - private static GoogleJsonResponseException googleJsonResponseException( - final int status, final String reason, final String message) throws IOException { - final JsonFactory jsonFactory = new JacksonFactory(); - HttpTransport transport = new MockHttpTransport() { - @Override - public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { - ErrorInfo errorInfo = new ErrorInfo(); - errorInfo.setReason(reason); - errorInfo.setMessage(message); - errorInfo.setFactory(jsonFactory); - GenericJson error = new GenericJson(); - error.set("code", status); - error.set("errors", Arrays.asList(errorInfo)); - error.setFactory(jsonFactory); - GenericJson errorResponse = new GenericJson(); - errorResponse.set("error", error); - errorResponse.setFactory(jsonFactory); - return new MockLowLevelHttpRequest().setResponse( - new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString()) - .setContentType(Json.MEDIA_TYPE).setStatusCode(status)); - } - }; - HttpRequest request = - transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); - request.setThrowExceptionOnExecuteError(false); - HttpResponse response = request.execute(); - return GoogleJsonResponseException.from(jsonFactory, response); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java new file mode 100644 index 0000000..7788b5b --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.io; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.TestCredential; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.common.collect.ImmutableList; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** + * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms. + */ +@RunWith(JUnit4.class) +public class DataflowTextIOTest { + + private TestDataflowPipelineOptions buildTestPipelineOptions() { + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setGcpCredential(new TestCredential()); + return options; + } + + private GcsUtil buildMockGcsUtil() throws IOException { + GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); + + // Any request to open gets a new bogus channel + Mockito + .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) + .then(new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + // Any request for expansion returns a list containing the original GcsPath + // This is required to pass validation that occurs in TextIO during apply() + Mockito + .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) + .then(new Answer<List<GcsPath>>() { + @Override + public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + + return mockGcsUtil; + } + + /** + * This tests a few corner cases that should not crash. + */ + @Test + public void testGoodWildcards() throws Exception { + TestDataflowPipelineOptions options = buildTestPipelineOptions(); + options.setGcsUtil(buildMockGcsUtil()); + + Pipeline pipeline = Pipeline.create(options); + + applyRead(pipeline, "gs://bucket/foo"); + applyRead(pipeline, "gs://bucket/foo/"); + applyRead(pipeline, "gs://bucket/foo/*"); + applyRead(pipeline, "gs://bucket/foo/?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]"); + applyRead(pipeline, "gs://bucket/foo/*baz*"); + applyRead(pipeline, "gs://bucket/foo/*baz?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); + applyRead(pipeline, "gs://bucket/foo/baz/*"); + applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); + applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); + applyRead(pipeline, "gs://bucket/foo*/baz"); + applyRead(pipeline, "gs://bucket/foo?/baz"); + applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); + + // Check that running doesn't fail. + pipeline.run(); + } + + private void applyRead(Pipeline pipeline, String path) { + pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java new file mode 100644 index 0000000..1b5a3c7 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowPipelineDebugOptions}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineDebugOptionsTest { + @Test + public void testTransformNameMapping() throws Exception { + DataflowPipelineDebugOptions options = PipelineOptionsFactory + .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"}) + .as(DataflowPipelineDebugOptions.class); + assertEquals(3, options.getTransformNameMapping().size()); + assertThat(options.getTransformNameMapping(), hasEntry("a", "b")); + assertThat(options.getTransformNameMapping(), hasEntry("foo", "")); + assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java new file mode 100644 index 0000000..eff79bb --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.dataflow.sdk.testing.ResetDateTimeProvider; +import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowPipelineOptions}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineOptionsTest { + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider(); + + @Test + public void testJobNameIsSet() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + assertEquals("TestJobName", options.getJobName()); + } + + @Test + public void testUserNameIsNotSet() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().remove("user.name"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("TestApplication"); + assertEquals("testapplication--1208190706", options.getJobName()); + assertTrue(options.getJobName().length() <= 40); + } + + @Test + public void testAppNameAndUserNameAreLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890123456789012345678901234567890"); + assertEquals( + "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", + options.getJobName()); + } + + @Test + public void testAppNameIsLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890123456789012345678901234567890"); + assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName()); + } + + @Test + public void testUserNameIsLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890"); + assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName()); + } + + @Test + public void testUtf8UserNameAndApplicationNameIsNormalized() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "ði ıntÉËnæÊÉnÉl "); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("fÉËnÉtık ÉsoÊsiËeıÊn"); + assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java new file mode 100644 index 0000000..1420273 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DataflowProfilingOptions}. + */ +@RunWith(JUnit4.class) +public class DataflowProfilingOptionsTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + public void testOptionsObject() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] { + "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"}) + .as(DataflowPipelineOptions.class); + assertTrue(options.getEnableProfilingAgent()); + + String json = MAPPER.writeValueAsString(options); + assertThat(json, Matchers.containsString( + "\"profilingAgentConfiguration\":{\"interval\":21}")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java new file mode 100644 index 0000000..b752f3d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.WARN; +import static org.junit.Assert.assertEquals; + +import com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides; +import com.google.common.collect.ImmutableMap; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowWorkerLoggingOptions}. */ +@RunWith(JUnit4.class) +public class DataflowWorkerLoggingOptionsTest { + private static final ObjectMapper MAPPER = new ObjectMapper(); + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testWorkerLogLevelOverrideWithInvalidLogLevel() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Unsupported log level"); + WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel")); + } + + @Test + public void testWorkerLogLevelOverrideForClass() throws Exception { + assertEquals("{\"org.junit.Test\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN))); + } + + @Test + public void testWorkerLogLevelOverrideForPackage() throws Exception { + assertEquals("{\"org.junit\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN))); + } + + @Test + public void testWorkerLogLevelOverrideForName() throws Exception { + assertEquals("{\"A\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForName("A", WARN))); + } + + @Test + public void testSerializationAndDeserializationOf() throws Exception { + String testValue = "{\"A\":\"WARN\"}"; + assertEquals(testValue, + MAPPER.writeValueAsString( + MAPPER.readValue(testValue, WorkerLogLevelOverrides.class))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java new file mode 100644 index 0000000..0322426 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult.State; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.util.MonitoringUtil; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.TestCredential; + +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.TimeUnit; + +/** + * Tests for BlockingDataflowPipelineRunner. + */ +@RunWith(JUnit4.class) +public class BlockingDataflowPipelineRunnerTest { + + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class); + + @Rule + public ExpectedException expectedThrown = ExpectedException.none(); + + /** + * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher} + * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}. + */ + private static class DataflowJobExceptionMatcher<T extends DataflowJobException> + extends TypeSafeMatcher<T> { + + private final Matcher<DataflowPipelineJob> matcher; + + public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T ex) { + return matcher.matches(ex.getJob()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("job "); + matcher.describeMismatch(item.getMessage(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception with job matching "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowJobException> Matcher<T> expectJob( + Matcher<DataflowPipelineJob> matcher) { + return new DataflowJobExceptionMatcher<T>(matcher); + } + } + + /** + * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher} + * to the return value of {@link DataflowPipelineJob#getJobId()}. + */ + private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> { + + private final Matcher<String> matcher; + + public JobIdMatcher(Matcher<String> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T job) { + return matcher.matches(job.getJobId()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("jobId "); + matcher.describeMismatch(item.getJobId(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("job with jobId "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) { + return new JobIdMatcher<T>(equalTo(jobId)); + } + } + + /** + * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying + * {@link Matcher} to the {@link DataflowPipelineJob} returned by + * {@link DataflowJobUpdatedException#getReplacedByJob()}. + */ + private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException> + extends TypeSafeMatcher<T> { + + private final Matcher<DataflowPipelineJob> matcher; + + public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T ex) { + return matcher.matches(ex.getReplacedByJob()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("job "); + matcher.describeMismatch(item.getMessage(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception with replacedByJob() "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy( + Matcher<DataflowPipelineJob> matcher) { + return new ReplacedByJobMatcher<T>(matcher); + } + } + + /** + * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId} + * that will immediately terminate in the provided {@code terminalState}. + * + * <p>The return value may be further mocked. + */ + private DataflowPipelineJob createMockJob( + String projectId, String jobId, State terminalState) throws Exception { + DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class); + when(mockJob.getProjectId()).thenReturn(projectId); + when(mockJob.getJobId()).thenReturn(jobId); + when(mockJob.waitToFinish( + anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class))) + .thenReturn(terminalState); + return mockJob; + } + + /** + * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return. + * Some {@link PipelineOptions} will be extracted from the job, such as the project ID. + */ + private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job) + throws Exception { + DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class); + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setProject(job.getProjectId()); + + when(mockRunner.run(isA(Pipeline.class))).thenReturn(job); + + return new BlockingDataflowPipelineRunner(mockRunner, options); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in + * the {@link State#DONE DONE} state. + */ + @Test + public void testJobDoneComplete() throws Exception { + createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) + .run(DirectPipeline.createForTest()); + expectedLogs.verifyInfo("Job finished with status DONE"); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception + * when a job terminates in the {@link State#FAILED FAILED} state. + */ + @Test + public void testFailedJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobExecutionException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testFailedJob-jobId"))); + createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) + .run(DirectPipeline.createForTest()); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception + * when a job terminates in the {@link State#CANCELLED CANCELLED} state. + */ + @Test + public void testCancelledJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobCancelledException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testCancelledJob-jobId"))); + createMockRunner( + createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) + .run(DirectPipeline.createForTest()); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception + * when a job terminates in the {@link State#UPDATED UPDATED} state. + */ + @Test + public void testUpdatedJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobUpdatedException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testUpdatedJob-jobId"))); + expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy( + JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId"))); + DataflowPipelineJob job = + createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED); + DataflowPipelineJob replacedByJob = + createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); + when(job.getReplacedByJob()).thenReturn(replacedByJob); + createMockRunner(job).run(DirectPipeline.createForTest()); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception + * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the + * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it + * is an old SDK relative the service). + */ + @Test + public void testUnknownJobThrowsException() throws Exception { + expectedThrown.expect(IllegalStateException.class); + createMockRunner( + createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) + .run(DirectPipeline.createForTest()); + } + + /** + * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception + * when a job returns a {@code null} state, indicating that it failed to contact the service, + * including all of its built-in resilience logic. + */ + @Test + public void testNullJobThrowsException() throws Exception { + expectedThrown.expect(DataflowServiceException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testNullJob-jobId"))); + createMockRunner( + createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) + .run(DirectPipeline.createForTest()); + } + + @Test + public void testToString() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + assertEquals("BlockingDataflowPipelineRunner#TestJobName", + BlockingDataflowPipelineRunner.fromOptions(options).toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java new file mode 100644 index 0000000..764c0cb --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java @@ -0,0 +1,605 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics; +import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.cloud.dataflow.sdk.PipelineResult.State; +import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; +import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.MonitoringUtil; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSetMultimap; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.math.BigDecimal; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; + +/** + * Tests for DataflowPipelineJob. + */ +@RunWith(JUnit4.class) +public class DataflowPipelineJobTest { + private static final String PROJECT_ID = "someProject"; + private static final String JOB_ID = "1234"; + + @Mock + private Dataflow mockWorkflowClient; + @Mock + private Dataflow.Projects mockProjects; + @Mock + private Dataflow.Projects.Jobs mockJobs; + @Rule + public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + when(mockWorkflowClient.projects()).thenReturn(mockProjects); + when(mockProjects.jobs()).thenReturn(mockJobs); + } + + /** + * Validates that a given time is valid for the total time slept by a + * AttemptBoundedExponentialBackOff given the number of retries and + * an initial polling interval. + * + * @param pollingIntervalMillis The initial polling interval given. + * @param attempts The number of attempts made + * @param timeSleptMillis The amount of time slept by the clock. This is checked + * against the valid interval. + */ + void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) { + long highSum = 0; + long lowSum = 0; + for (int i = 1; i < attempts; i++) { + double currentInterval = + pollingIntervalMillis + * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 1); + double offset = + AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * currentInterval; + highSum += Math.round(currentInterval + offset); + lowSum += Math.round(currentInterval - offset); + } + assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), lessThanOrEqualTo(highSum))); + } + + @Test + public void testWaitToFinishMessagesFail() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_" + State.DONE.name()); + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + MonitoringUtil.JobMessagesHandler jobHandler = mock(MonitoringUtil.JobMessagesHandler.class); + Dataflow.Projects.Jobs.Messages mockMessages = + mock(Dataflow.Projects.Jobs.Messages.class); + Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class); + when(mockJobs.messages()).thenReturn(mockMessages); + when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest); + when(listRequest.execute()).thenThrow(SocketTimeoutException.class); + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + + State state = job.waitToFinish(5, TimeUnit.MINUTES, jobHandler, fastClock, fastClock); + assertEquals(null, state); + } + + public State mockWaitToFinishInState(State state) throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_" + state.name()); + + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + + return job.waitToFinish(1, TimeUnit.MINUTES, null, fastClock, fastClock); + } + + /** + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#DONE DONE} + * state is terminal. + */ + @Test + public void testWaitToFinishDone() throws Exception { + assertEquals(State.DONE, mockWaitToFinishInState(State.DONE)); + } + + /** + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} + * state is terminal. + */ + @Test + public void testWaitToFinishFailed() throws Exception { + assertEquals(State.FAILED, mockWaitToFinishInState(State.FAILED)); + } + + /** + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} + * state is terminal. + */ + @Test + public void testWaitToFinishCancelled() throws Exception { + assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED)); + } + + /** + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} + * state is terminal. + */ + @Test + public void testWaitToFinishUpdated() throws Exception { + assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED)); + } + + @Test + public void testWaitToFinishFail() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenThrow(IOException.class); + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + + long startTime = fastClock.nanoTime(); + State state = job.waitToFinish(5, TimeUnit.MINUTES, null, fastClock, fastClock); + assertEquals(null, state); + long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); + checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL, + DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff); + } + + @Test + public void testWaitToFinishTimeFail() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenThrow(IOException.class); + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + long startTime = fastClock.nanoTime(); + State state = job.waitToFinish(4, TimeUnit.MILLISECONDS, null, fastClock, fastClock); + assertEquals(null, state); + long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); + // Should only sleep for the 4 ms remaining. + assertEquals(timeDiff, 4L); + } + + @Test + public void testGetStateReturnsServiceState() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_" + State.RUNNING.name()); + + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + + assertEquals( + State.RUNNING, + job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock)); + } + + @Test + public void testGetStateWithExceptionReturnsUnknown() throws Exception { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); + when(statusRequest.execute()).thenThrow(IOException.class); + DataflowAggregatorTransforms dataflowAggregatorTransforms = + mock(DataflowAggregatorTransforms.class); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms); + + long startTime = fastClock.nanoTime(); + assertEquals( + State.UNKNOWN, + job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock)); + long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); + checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL, + DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff); + } + + @Test + public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue() + throws IOException, AggregatorRetrievalException { + Aggregator<?, ?> aggregator = mock(Aggregator.class); + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + JobMetrics jobMetrics = new JobMetrics(); + when(getMetrics.execute()).thenReturn(jobMetrics); + + jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of()); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + AggregatorValues<?> values = job.getAggregatorValues(aggregator); + + assertThat(values.getValues(), empty()); + } + + @Test + public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue() + throws IOException, AggregatorRetrievalException { + Aggregator<?, ?> aggregator = mock(Aggregator.class); + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + JobMetrics jobMetrics = new JobMetrics(); + when(getMetrics.execute()).thenReturn(jobMetrics); + + jobMetrics.setMetrics(null); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + AggregatorValues<?> values = job.getAggregatorValues(aggregator); + + assertThat(values.getValues(), empty()); + } + + @Test + public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection() + throws IOException, AggregatorRetrievalException { + CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn(); + String aggregatorName = "agg"; + Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + JobMetrics jobMetrics = new JobMetrics(); + when(getMetrics.execute()).thenReturn(jobMetrics); + + MetricUpdate update = new MetricUpdate(); + long stepValue = 1234L; + update.setScalar(new BigDecimal(stepValue)); + + MetricStructuredName structuredName = new MetricStructuredName(); + structuredName.setName(aggregatorName); + structuredName.setContext(ImmutableMap.of("step", stepName)); + update.setName(structuredName); + + jobMetrics.setMetrics(ImmutableList.of(update)); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + AggregatorValues<Long> values = job.getAggregatorValues(aggregator); + + assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue)); + assertThat(values.getValuesAtSteps().size(), equalTo(1)); + assertThat(values.getValues(), contains(stepValue)); + assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue))); + } + + @Test + public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection() + throws IOException, AggregatorRetrievalException { + CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn(); + String aggregatorName = "agg"; + Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); + + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> otherTransform = mock(PTransform.class); + String otherStepName = "s88"; + String otherFullName = "Spam/Ham/Eggs"; + AppliedPTransform<?, ?, ?> otherAppliedTransform = + appliedPTransform(otherFullName, otherTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of( + aggregator, pTransform, aggregator, otherTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of( + appliedTransform, stepName, otherAppliedTransform, otherStepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + JobMetrics jobMetrics = new JobMetrics(); + when(getMetrics.execute()).thenReturn(jobMetrics); + + MetricUpdate updateOne = new MetricUpdate(); + long stepValue = 1234L; + updateOne.setScalar(new BigDecimal(stepValue)); + + MetricStructuredName structuredNameOne = new MetricStructuredName(); + structuredNameOne.setName(aggregatorName); + structuredNameOne.setContext(ImmutableMap.of("step", stepName)); + updateOne.setName(structuredNameOne); + + MetricUpdate updateTwo = new MetricUpdate(); + long stepValueTwo = 1024L; + updateTwo.setScalar(new BigDecimal(stepValueTwo)); + + MetricStructuredName structuredNameTwo = new MetricStructuredName(); + structuredNameTwo.setName(aggregatorName); + structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName)); + updateTwo.setName(structuredNameTwo); + + jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo)); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + AggregatorValues<Long> values = job.getAggregatorValues(aggregator); + + assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue)); + assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo)); + assertThat(values.getValuesAtSteps().size(), equalTo(2)); + assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo)); + assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo))); + } + + @Test + public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate() + throws IOException, AggregatorRetrievalException { + CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn(); + String aggregatorName = "agg"; + Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + JobMetrics jobMetrics = new JobMetrics(); + when(getMetrics.execute()).thenReturn(jobMetrics); + + MetricUpdate ignoredUpdate = new MetricUpdate(); + ignoredUpdate.setScalar(null); + + MetricStructuredName ignoredName = new MetricStructuredName(); + ignoredName.setName("ignoredAggregator.elementCount.out0"); + ignoredName.setContext(null); + ignoredUpdate.setName(ignoredName); + + jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate)); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + AggregatorValues<Long> values = job.getAggregatorValues(aggregator); + + assertThat(values.getValuesAtSteps().entrySet(), empty()); + assertThat(values.getValues(), empty()); + } + + @Test + public void testGetAggregatorValuesWithUnusedAggregatorThrowsException() + throws AggregatorRetrievalException { + Aggregator<?, ?> aggregator = mock(Aggregator.class); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("not used in this pipeline"); + + job.getAggregatorValues(aggregator); + } + + @Test + public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException() + throws IOException, AggregatorRetrievalException { + CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn(); + String aggregatorName = "agg"; + Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); + @SuppressWarnings("unchecked") + PTransform<PInput, POutput> pTransform = mock(PTransform.class); + String stepName = "s1"; + String fullName = "Foo/Bar/Baz"; + AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform); + + DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( + ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), + ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); + + GetMetrics getMetrics = mock(GetMetrics.class); + when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics); + IOException cause = new IOException(); + when(getMetrics.execute()).thenThrow(cause); + + Get getState = mock(Get.class); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState); + Job modelJob = new Job(); + when(getState.execute()).thenReturn(modelJob); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = + new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms); + + thrown.expect(AggregatorRetrievalException.class); + thrown.expectCause(is(cause)); + thrown.expectMessage(aggregator.toString()); + thrown.expectMessage("when retrieving Aggregator values for"); + + job.getAggregatorValues(aggregator); + } + + private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> { + private final CombineFn<InT, ?, OutT> combineFn; + private final String name; + + public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) { + this.combineFn = combineFn; + this.name = name; + } + + @Override + public void addValue(InT value) { + throw new AssertionError(); + } + + @Override + public String getName() { + return name; + } + + @Override + public CombineFn<InT, ?, OutT> getCombineFn() { + return combineFn; + } + } + + private AppliedPTransform<?, ?, ?> appliedPTransform( + String fullName, PTransform<PInput, POutput> transform) { + return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java new file mode 100644 index 0000000..4850939 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ServiceLoader; + +/** Tests for {@link DataflowPipelineRegistrar}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineRegistrarTest { + @Test + public void testCorrectOptionsAreReturned() { + assertEquals(ImmutableList.of(DataflowPipelineOptions.class, + BlockingDataflowPipelineOptions.class), + new DataflowPipelineRegistrar.Options().getPipelineOptions()); + } + + @Test + public void testCorrectRunnersAreReturned() { + assertEquals(ImmutableList.of(DataflowPipelineRunner.class, + BlockingDataflowPipelineRunner.class), + new DataflowPipelineRegistrar.Runner().getPipelineRunners()); + } + + @Test + public void testServiceLoaderForOptions() { + for (PipelineOptionsRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof DataflowPipelineRegistrar.Options) { + return; + } + } + fail("Expected to find " + DataflowPipelineRegistrar.Options.class); + } + + @Test + public void testServiceLoaderForRunner() { + for (PipelineRunnerRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { + if (registrar instanceof DataflowPipelineRegistrar.Runner) { + return; + } + } + fail("Expected to find " + DataflowPipelineRegistrar.Runner.class); + } +}