http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
deleted file mode 100644
index d2c08c8..0000000
--- 
a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static org.hamcrest.Matchers.equalTo;
-import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.LowLevelHttpRequest;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.HttpTesting;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
-import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
-import com.google.cloud.dataflow.sdk.util.PackageUtil.PackageAttributes;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import com.google.common.io.LineReader;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-/** Tests for PackageUtil. */
-@RunWith(JUnit4.class)
-public class PackageUtilTest {
-  @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule
-  public FastNanoClockAndSleeper fastNanoClockAndSleeper = new 
FastNanoClockAndSleeper();
-
-  @Mock
-  GcsUtil mockGcsUtil;
-
-  // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
-  private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
-
-  // Hamcrest matcher to assert a string matches a pattern
-  private static class RegexMatcher extends BaseMatcher<String> {
-    private final Pattern pattern;
-
-    public RegexMatcher(String regex) {
-      this.pattern = Pattern.compile(regex);
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      if (!(o instanceof String)) {
-        return false;
-      }
-      return pattern.matcher((String) o).matches();
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText(String.format("matches regular expression %s", 
pattern));
-    }
-
-    public static RegexMatcher matches(String regex) {
-      return new RegexMatcher(regex);
-    }
-  }
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
-    pipelineOptions.setGcsUtil(mockGcsUtil);
-
-    IOChannelUtils.registerStandardIOFactories(pipelineOptions);
-  }
-
-  private File makeFileWithContents(String name, String contents) throws 
Exception {
-    File tmpFile = tmpFolder.newFile(name);
-    Files.write(contents, tmpFile, StandardCharsets.UTF_8);
-    tmpFile.setLastModified(0);  // required for determinism with directories
-    return tmpFile;
-  }
-
-  static final String STAGING_PATH = GcsPath.fromComponents("somebucket", 
"base/path").toString();
-  private static PackageAttributes makePackageAttributes(File file, String 
overridePackageName) {
-    return PackageUtil.createPackageAttributes(file, STAGING_PATH, 
overridePackageName);
-  }
-
-  @Test
-  public void testFileWithExtensionPackageNamingAndSize() throws Exception {
-    String contents = "This is a test!";
-    File tmpFile = makeFileWithContents("file.txt", contents);
-    PackageAttributes attr = makePackageAttributes(tmpFile, null);
-    DataflowPackage target = attr.getDataflowPackage();
-
-    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + 
".txt"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
-    assertThat(attr.getSize(), equalTo((long) contents.length()));
-  }
-
-  @Test
-  public void testPackageNamingWithFileNoExtension() throws Exception {
-    File tmpFile = makeFileWithContents("file", "This is a test!");
-    DataflowPackage target = makePackageAttributes(tmpFile, 
null).getDataflowPackage();
-
-    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
-  }
-
-  @Test
-  public void testPackageNamingWithDirectory() throws Exception {
-    File tmpDirectory = tmpFolder.newFolder("folder");
-    DataflowPackage target = makePackageAttributes(tmpDirectory, 
null).getDataflowPackage();
-
-    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN 
+ ".jar"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
-  }
-
-  @Test
-  public void testPackageNamingWithFilesHavingSameContentsAndSameNames() 
throws Exception {
-    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
-    makeFileWithContents("folder1/folderA/sameName", "This is a test!");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, 
null).getDataflowPackage();
-
-    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
-    makeFileWithContents("folder2/folderA/sameName", "This is a test!");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, 
null).getDataflowPackage();
-
-    assertEquals(target1.getName(), target2.getName());
-    assertEquals(target1.getLocation(), target2.getLocation());
-  }
-
-  @Test
-  public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() 
throws Exception {
-    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
-    makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, 
null).getDataflowPackage();
-
-    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
-    makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, 
null).getDataflowPackage();
-
-    assertNotEquals(target1.getName(), target2.getName());
-    assertNotEquals(target1.getLocation(), target2.getLocation());
-  }
-
-  @Test
-  public void 
testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames()
-      throws Exception {
-    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
-    tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
-    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, 
null).getDataflowPackage();
-
-    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
-    tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
-    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, 
null).getDataflowPackage();
-
-    assertNotEquals(target1.getName(), target2.getName());
-    assertNotEquals(target1.getLocation(), target2.getLocation());
-  }
-
-  @Test
-  public void testPackageUploadWithLargeClasspathLogsWarning() throws 
Exception {
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    // all files will be present and cached so no upload needed.
-    
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
-
-    List<String> classpathElements = Lists.newLinkedList();
-    for (int i = 0; i < 1005; ++i) {
-      String eltName = "element" + i;
-      classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
-    }
-
-    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
-
-    logged.verifyWarn("Your classpath contains 1005 elements, which Google 
Cloud Dataflow");
-  }
-
-  @Test
-  public void testPackageUploadWithFileSucceeds() throws Exception {
-    Pipe pipe = Pipe.open();
-    String contents = "This is a test!";
-    File tmpFile = makeFileWithContents("file.txt", contents);
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
-
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
-    DataflowPackage target = Iterables.getOnlyElement(targets);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-    verifyNoMoreInteractions(mockGcsUtil);
-
-    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + 
".txt"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
-    assertThat(new LineReader(Channels.newReader(pipe.source(), 
"UTF-8")).readLine(),
-        equalTo(contents));
-  }
-
-  @Test
-  public void testPackageUploadWithDirectorySucceeds() throws Exception {
-    Pipe pipe = Pipe.open();
-    File tmpDirectory = tmpFolder.newFolder("folder");
-    tmpFolder.newFolder("folder", "empty_directory");
-    tmpFolder.newFolder("folder", "directory");
-    makeFileWithContents("folder/file.txt", "This is a test!");
-    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
-
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
-
-    PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-    verifyNoMoreInteractions(mockGcsUtil);
-
-    ZipInputStream inputStream = new 
ZipInputStream(Channels.newInputStream(pipe.source()));
-    List<String> zipEntryNames = new ArrayList<>();
-    for (ZipEntry entry = inputStream.getNextEntry(); entry != null;
-        entry = inputStream.getNextEntry()) {
-      zipEntryNames.add(entry.getName());
-    }
-
-    assertThat(zipEntryNames,
-        containsInAnyOrder("directory/file.txt", "empty_directory/", 
"file.txt"));
-  }
-
-  @Test
-  public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
-    Pipe pipe = Pipe.open();
-    File tmpDirectory = tmpFolder.newFolder("folder");
-
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
-
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
-    DataflowPackage target = Iterables.getOnlyElement(targets);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-    verifyNoMoreInteractions(mockGcsUtil);
-
-    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN 
+ ".jar"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
-    assertNull(new 
ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
-        .thenThrow(new IOException("Fake Exception: Upload error"));
-
-    try {
-      PackageUtil.stageClasspathElements(
-          ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper);
-    } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
-      verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
-      verifyNoMoreInteractions(mockGcsUtil);
-    }
-  }
-
-  @Test
-  public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() 
throws Exception {
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
-        .thenThrow(new IOException("Failed to write to GCS path " + 
STAGING_PATH,
-            googleJsonResponseException(
-                HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", 
"Test message")));
-
-    try {
-      PackageUtil.stageClasspathElements(
-          ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper);
-      fail("Expected RuntimeException");
-    } catch (RuntimeException e) {
-      assertTrue("Expected IOException containing detailed message.",
-          e.getCause() instanceof IOException);
-      assertThat(e.getCause().getMessage(),
-          Matchers.allOf(
-              Matchers.containsString("Uploaded failed due to permissions 
error"),
-              Matchers.containsString(
-                  "Stale credentials can be resolved by executing 'gcloud auth 
login'")));
-    } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
-      verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-      verifyNoMoreInteractions(mockGcsUtil);
-    }
-  }
-
-  @Test
-  public void testPackageUploadEventuallySucceeds() throws Exception {
-    Pipe pipe = Pipe.open();
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
-        .thenThrow(new IOException("Fake Exception: 410 Gone")) // First 
attempt fails
-        .thenReturn(pipe.sink());                               // second 
attempt succeeds
-
-    try {
-      PackageUtil.stageClasspathElements(
-                                              
ImmutableList.of(tmpFile.getAbsolutePath()),
-                                              STAGING_PATH,
-                                              fastNanoClockAndSleeper);
-    } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
-      verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
-      verifyNoMoreInteractions(mockGcsUtil);
-    }
-  }
-
-  @Test
-  public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws 
Exception {
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
-
-    PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verifyNoMoreInteractions(mockGcsUtil);
-  }
-
-  @Test
-  public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws 
Exception {
-    Pipe pipe = Pipe.open();
-    File tmpDirectory = tmpFolder.newFolder("folder");
-    tmpFolder.newFolder("folder", "empty_directory");
-    tmpFolder.newFolder("folder", "directory");
-    makeFileWithContents("folder/file.txt", "This is a test!");
-    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
-    when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
-
-    PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-    verifyNoMoreInteractions(mockGcsUtil);
-  }
-
-  @Test
-  public void testPackageUploadWithExplicitPackageName() throws Exception {
-    Pipe pipe = Pipe.open();
-    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    final String overriddenName = "alias.txt";
-
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
-    when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
-
-    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), 
STAGING_PATH);
-    DataflowPackage target = Iterables.getOnlyElement(targets);
-
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
-    verifyNoMoreInteractions(mockGcsUtil);
-
-    assertThat(target.getName(), equalTo(overriddenName));
-    assertThat(target.getLocation(),
-        RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
-  }
-
-  @Test
-  public void testPackageUploadIsSkippedWithNonExistentResource() throws 
Exception {
-    String nonExistentFile =
-        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), 
"non-existent-file");
-    assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
-        ImmutableList.of(nonExistentFile), STAGING_PATH));
-  }
-
-  /**
-   * Builds a fake GoogleJsonResponseException for testing API error handling.
-   */
-  private static GoogleJsonResponseException googleJsonResponseException(
-      final int status, final String reason, final String message) throws 
IOException {
-    final JsonFactory jsonFactory = new JacksonFactory();
-    HttpTransport transport = new MockHttpTransport() {
-      @Override
-      public LowLevelHttpRequest buildRequest(String method, String url) 
throws IOException {
-        ErrorInfo errorInfo = new ErrorInfo();
-        errorInfo.setReason(reason);
-        errorInfo.setMessage(message);
-        errorInfo.setFactory(jsonFactory);
-        GenericJson error = new GenericJson();
-        error.set("code", status);
-        error.set("errors", Arrays.asList(errorInfo));
-        error.setFactory(jsonFactory);
-        GenericJson errorResponse = new GenericJson();
-        errorResponse.set("error", error);
-        errorResponse.setFactory(jsonFactory);
-        return new MockLowLevelHttpRequest().setResponse(
-            new 
MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
-            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
-        }
-    };
-    HttpRequest request =
-        
transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
-    request.setThrowExceptionOnExecuteError(false);
-    HttpResponse response = request.execute();
-    return GoogleJsonResponseException.from(jsonFactory, response);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
new file mode 100644
index 0000000..7788b5b
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+
+/**
+ * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write 
transforms.
+ */
+@RunWith(JUnit4.class)
+public class DataflowTextIOTest {
+
+  private TestDataflowPipelineOptions buildTestPipelineOptions() {
+    TestDataflowPipelineOptions options =
+        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setGcpCredential(new TestCredential());
+    return options;
+  }
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito
+        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito
+        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+        .then(new Answer<List<GcsPath>>() {
+          @Override
+          public List<GcsPath> answer(InvocationOnMock invocation) throws 
Throwable {
+            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+          }
+        });
+
+    return mockGcsUtil;
+  }
+
+  /**
+   * This tests a few corner cases that should not crash.
+   */
+  @Test
+  public void testGoodWildcards() throws Exception {
+    TestDataflowPipelineOptions options = buildTestPipelineOptions();
+    options.setGcsUtil(buildMockGcsUtil());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    applyRead(pipeline, "gs://bucket/foo");
+    applyRead(pipeline, "gs://bucket/foo/");
+    applyRead(pipeline, "gs://bucket/foo/*");
+    applyRead(pipeline, "gs://bucket/foo/?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]");
+    applyRead(pipeline, "gs://bucket/foo/*baz*");
+    applyRead(pipeline, "gs://bucket/foo/*baz?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+    applyRead(pipeline, "gs://bucket/foo/baz/*");
+    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+    applyRead(pipeline, "gs://bucket/foo*/baz");
+    applyRead(pipeline, "gs://bucket/foo?/baz");
+    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+    // Check that running doesn't fail.
+    pipeline.run();
+  }
+
+  private void applyRead(Pipeline pipeline, String path) {
+    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
new file mode 100644
index 0000000..1b5a3c7
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowPipelineDebugOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowPipelineDebugOptionsTest {
+  @Test
+  public void testTransformNameMapping() throws Exception {
+    DataflowPipelineDebugOptions options = PipelineOptionsFactory
+        .fromArgs(new 
String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
+        .as(DataflowPipelineDebugOptions.class);
+    assertEquals(3, options.getTransformNameMapping().size());
+    assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
+    assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
+    assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
new file mode 100644
index 0000000..eff79bb
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.dataflow.sdk.testing.ResetDateTimeProvider;
+import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowPipelineOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowPipelineOptionsTest {
+  @Rule public TestRule restoreSystemProperties = new 
RestoreSystemProperties();
+  @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new 
ResetDateTimeProvider();
+
+  @Test
+  public void testJobNameIsSet() {
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    assertEquals("TestJobName", options.getJobName());
+  }
+
+  @Test
+  public void testUserNameIsNotSet() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().remove("user.name");
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("TestApplication");
+    assertEquals("testapplication--1208190706", options.getJobName());
+    assertTrue(options.getJobName().length() <= 40);
+  }
+
+  @Test
+  public void testAppNameAndUserNameAreLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890123456789012345678901234567890");
+    assertEquals(
+        
"a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
+        options.getJobName());
+  }
+
+  @Test
+  public void testAppNameIsLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcde");
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890123456789012345678901234567890");
+    assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", 
options.getJobName());
+  }
+
+  @Test
+  public void testUserNameIsLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890");
+    assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", 
options.getJobName());
+  }
+
+  @Test
+  public void testUtf8UserNameAndApplicationNameIsNormalized() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
+    assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", 
options.getJobName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
new file mode 100644
index 0000000..1420273
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DataflowProfilingOptions}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowProfilingOptionsTest {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Test
+  public void testOptionsObject() throws Exception {
+    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new 
String[] {
+        "--enableProfilingAgent", 
"--profilingAgentConfiguration={\"interval\": 21}"})
+        .as(DataflowPipelineOptions.class);
+    assertTrue(options.getEnableProfilingAgent());
+
+    String json = MAPPER.writeValueAsString(options);
+    assertThat(json, Matchers.containsString(
+        "\"profilingAgentConfiguration\":{\"interval\":21}"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
new file mode 100644
index 0000000..b752f3d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import static 
com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;
+import static org.junit.Assert.assertEquals;
+
+import 
com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
+import com.google.common.collect.ImmutableMap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowWorkerLoggingOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowWorkerLoggingOptionsTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Unsupported log level");
+    WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForClass() throws Exception {
+    assertEquals("{\"org.junit.Test\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new WorkerLogLevelOverrides().addOverrideForClass(Test.class, 
WARN)));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForPackage() throws Exception {
+    assertEquals("{\"org.junit\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new 
WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), 
WARN)));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForName() throws Exception {
+    assertEquals("{\"A\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
+  }
+
+  @Test
+  public void testSerializationAndDeserializationOf() throws Exception {
+    String testValue = "{\"A\":\"WARN\"}";
+    assertEquals(testValue,
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
new file mode 100644
index 0000000..0322426
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
+
+import org.hamcrest.Description;
+import org.hamcrest.Factory;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for BlockingDataflowPipelineRunner.
+ */
+@RunWith(JUnit4.class)
+public class BlockingDataflowPipelineRunnerTest {
+
+  @Rule
+  public ExpectedLogs expectedLogs = 
ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
+
+  @Rule
+  public ExpectedException expectedThrown = ExpectedException.none();
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobException} that applies an 
underlying {@link Matcher}
+   * to the {@link DataflowPipelineJob} returned by {@link 
DataflowJobException#getJob()}.
+   */
+  private static class DataflowJobExceptionMatcher<T extends 
DataflowJobException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with job matching ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobException> Matcher<T> expectJob(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new DataflowJobExceptionMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an 
underlying {@link Matcher}
+   * to the return value of {@link DataflowPipelineJob#getJobId()}.
+   */
+  private static class JobIdMatcher<T extends DataflowPipelineJob> extends 
TypeSafeMatcher<T> {
+
+    private final Matcher<String> matcher;
+
+    public JobIdMatcher(Matcher<String> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T job) {
+      return matcher.matches(job.getJobId());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("jobId ");
+        matcher.describeMismatch(item.getJobId(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("job with jobId ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final 
String jobId) {
+      return new JobIdMatcher<T>(equalTo(jobId));
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies 
an underlying
+   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
+   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
+   */
+  private static class ReplacedByJobMatcher<T extends 
DataflowJobUpdatedException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getReplacedByJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with replacedByJob() ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobUpdatedException> Matcher<T> 
expectReplacedBy(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new ReplacedByJobMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * Creates a mocked {@link DataflowPipelineJob} with the given {@code 
projectId} and {@code jobId}
+   * that will immediately terminate in the provided {@code terminalState}.
+   *
+   * <p>The return value may be further mocked.
+   */
+  private DataflowPipelineJob createMockJob(
+      String projectId, String jobId, State terminalState) throws Exception {
+    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
+    when(mockJob.getProjectId()).thenReturn(projectId);
+    when(mockJob.getJobId()).thenReturn(jobId);
+    when(mockJob.waitToFinish(
+        anyLong(), isA(TimeUnit.class), 
isA(MonitoringUtil.JobMessagesHandler.class)))
+        .thenReturn(terminalState);
+    return mockJob;
+  }
+
+  /**
+   * Returns a {@link BlockingDataflowPipelineRunner} that will return the 
provided a job to return.
+   * Some {@link PipelineOptions} will be extracted from the job, such as the 
project ID.
+   */
+  private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob 
job)
+      throws Exception {
+    DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
+    TestDataflowPipelineOptions options =
+        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setProject(job.getProjectId());
+
+    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
+
+    return new BlockingDataflowPipelineRunner(mockRunner, options);
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} returns normally 
when a job terminates in
+   * the {@link State#DONE DONE} state.
+   */
+  @Test
+  public void testJobDoneComplete() throws Exception {
+    createMockRunner(createMockJob("testJobDone-projectId", 
"testJobDone-jobId", State.DONE))
+        .run(DirectPipeline.createForTest());
+    expectedLogs.verifyInfo("Job finished with status DONE");
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the 
appropriate exception
+   * when a job terminates in the {@link State#FAILED FAILED} state.
+   */
+  @Test
+  public void testFailedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobExecutionException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testFailedJob-jobId")));
+    createMockRunner(createMockJob("testFailedJob-projectId", 
"testFailedJob-jobId", State.FAILED))
+        .run(DirectPipeline.createForTest());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the 
appropriate exception
+   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
+   */
+  @Test
+  public void testCancelledJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobCancelledException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
+    createMockRunner(
+        createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", 
State.CANCELLED))
+        .run(DirectPipeline.createForTest());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the 
appropriate exception
+   * when a job terminates in the {@link State#UPDATED UPDATED} state.
+   */
+  @Test
+  public void testUpdatedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobUpdatedException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
+    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
+        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
+    DataflowPipelineJob job =
+        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", 
State.UPDATED);
+    DataflowPipelineJob replacedByJob =
+        createMockJob("testUpdatedJob-projectId", 
"testUpdatedJob-replacedByJobId", State.DONE);
+    when(job.getReplacedByJob()).thenReturn(replacedByJob);
+    createMockRunner(job).run(DirectPipeline.createForTest());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the 
appropriate exception
+   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, 
indicating that the
+   * Dataflow service returned a state that the SDK is unfamiliar with 
(possibly because it
+   * is an old SDK relative the service).
+   */
+  @Test
+  public void testUnknownJobThrowsException() throws Exception {
+    expectedThrown.expect(IllegalStateException.class);
+    createMockRunner(
+        createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", 
State.UNKNOWN))
+        .run(DirectPipeline.createForTest());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowPipelineRunner} throws the 
appropriate exception
+   * when a job returns a {@code null} state, indicating that it failed to 
contact the service,
+   * including all of its built-in resilience logic.
+   */
+  @Test
+  public void testNullJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowServiceException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testNullJob-jobId")));
+    createMockRunner(
+        createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
+        .run(DirectPipeline.createForTest());
+  }
+
+  @Test
+  public void testToString() {
+    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setGcpCredential(new TestCredential());
+    options.setPathValidatorClass(NoopPathValidator.class);
+    assertEquals("BlockingDataflowPipelineRunner#TestJobName",
+        BlockingDataflowPipelineRunner.fromOptions(options).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java
new file mode 100644
index 0000000..764c0cb
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineJobTest.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import 
com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
+import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSetMultimap;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for DataflowPipelineJob.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPipelineJobTest {
+  private static final String PROJECT_ID = "someProject";
+  private static final String JOB_ID = "1234";
+
+  @Mock
+  private Dataflow mockWorkflowClient;
+  @Mock
+  private Dataflow.Projects mockProjects;
+  @Mock
+  private Dataflow.Projects.Jobs mockJobs;
+  @Rule
+  public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(mockWorkflowClient.projects()).thenReturn(mockProjects);
+    when(mockProjects.jobs()).thenReturn(mockJobs);
+  }
+
+  /**
+   * Validates that a given time is valid for the total time slept by a
+   * AttemptBoundedExponentialBackOff given the number of retries and
+   * an initial polling interval.
+   *
+   * @param pollingIntervalMillis The initial polling interval given.
+   * @param attempts The number of attempts made
+   * @param timeSleptMillis The amount of time slept by the clock. This is 
checked
+   * against the valid interval.
+   */
+  void checkValidInterval(long pollingIntervalMillis, int attempts, long 
timeSleptMillis) {
+    long highSum = 0;
+    long lowSum = 0;
+    for (int i = 1; i < attempts; i++) {
+      double currentInterval =
+          pollingIntervalMillis
+          * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 
1);
+      double offset =
+          AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * 
currentInterval;
+      highSum += Math.round(currentInterval + offset);
+      lowSum += Math.round(currentInterval - offset);
+    }
+    assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), 
lessThanOrEqualTo(highSum)));
+  }
+
+  @Test
+  public void testWaitToFinishMessagesFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + State.DONE.name());
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    MonitoringUtil.JobMessagesHandler jobHandler = 
mock(MonitoringUtil.JobMessagesHandler.class);
+    Dataflow.Projects.Jobs.Messages mockMessages =
+        mock(Dataflow.Projects.Jobs.Messages.class);
+    Messages.List listRequest = 
mock(Dataflow.Projects.Jobs.Messages.List.class);
+    when(mockJobs.messages()).thenReturn(mockMessages);
+    when(mockMessages.list(eq(PROJECT_ID), 
eq(JOB_ID))).thenReturn(listRequest);
+    when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    State state = job.waitToFinish(5, TimeUnit.MINUTES, jobHandler, fastClock, 
fastClock);
+    assertEquals(null, state);
+  }
+
+  public State mockWaitToFinishInState(State state) throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + state.name());
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    return job.waitToFinish(1, TimeUnit.MINUTES, null, fastClock, fastClock);
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#DONE DONE}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishDone() throws Exception {
+    assertEquals(State.DONE, mockWaitToFinishInState(State.DONE));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishFailed() throws Exception {
+    assertEquals(State.FAILED, mockWaitToFinishInState(State.FAILED));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishCancelled() throws Exception {
+    assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#FAILED FAILED}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishUpdated() throws Exception {
+    assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED));
+  }
+
+  @Test
+  public void testWaitToFinishFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    long startTime = fastClock.nanoTime();
+    State state = job.waitToFinish(5, TimeUnit.MINUTES, null, fastClock, 
fastClock);
+    assertEquals(null, state);
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - 
startTime);
+    checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
+        DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff);
+  }
+
+  @Test
+  public void testWaitToFinishTimeFail() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+    long startTime = fastClock.nanoTime();
+    State state = job.waitToFinish(4, TimeUnit.MILLISECONDS, null, fastClock, 
fastClock);
+    assertEquals(null, state);
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - 
startTime);
+    // Should only sleep for the 4 ms remaining.
+    assertEquals(timeDiff, 4L);
+  }
+
+  @Test
+  public void testGetStateReturnsServiceState() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_" + State.RUNNING.name());
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    assertEquals(
+        State.RUNNING,
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, 
fastClock));
+  }
+
+  @Test
+  public void testGetStateWithExceptionReturnsUnknown() throws Exception {
+    Dataflow.Projects.Jobs.Get statusRequest = 
mock(Dataflow.Projects.Jobs.Get.class);
+
+    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenThrow(IOException.class);
+    DataflowAggregatorTransforms dataflowAggregatorTransforms =
+        mock(DataflowAggregatorTransforms.class);
+
+    DataflowPipelineJob job = new DataflowPipelineJob(
+        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+
+    long startTime = fastClock.nanoTime();
+    assertEquals(
+        State.UNKNOWN,
+        job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, 
fastClock));
+    long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - 
startTime);
+    checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
+        DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff);
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue()
+      throws IOException, AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, 
?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, 
stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue()
+      throws IOException, AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, 
?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, 
stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    jobMetrics.setMetrics(null);
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    AggregatorValues<?> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void 
testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, 
aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, 
?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, 
stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate update = new MetricUpdate();
+    long stepValue = 1234L;
+    update.setScalar(new BigDecimal(stepValue));
+
+    MetricStructuredName structuredName = new MetricStructuredName();
+    structuredName.setName(aggregatorName);
+    structuredName.setContext(ImmutableMap.of("step", stepName));
+    update.setName(structuredName);
+
+    jobMetrics.setMetrics(ImmutableList.of(update));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
+    assertThat(values.getValuesAtSteps().size(), equalTo(1));
+    assertThat(values.getValues(), contains(stepValue));
+    assertThat(values.getTotalValue(combineFn), 
equalTo(Long.valueOf(stepValue)));
+  }
+
+  @Test
+  public void 
testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, 
aggregatorName);
+
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
+    String otherStepName = "s88";
+    String otherFullName = "Spam/Ham/Eggs";
+    AppliedPTransform<?, ?, ?> otherAppliedTransform =
+        appliedPTransform(otherFullName, otherTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
+                                aggregator, pTransform, aggregator, 
otherTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
+            appliedTransform, stepName, otherAppliedTransform, otherStepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate updateOne = new MetricUpdate();
+    long stepValue = 1234L;
+    updateOne.setScalar(new BigDecimal(stepValue));
+
+    MetricStructuredName structuredNameOne = new MetricStructuredName();
+    structuredNameOne.setName(aggregatorName);
+    structuredNameOne.setContext(ImmutableMap.of("step", stepName));
+    updateOne.setName(structuredNameOne);
+
+    MetricUpdate updateTwo = new MetricUpdate();
+    long stepValueTwo = 1024L;
+    updateTwo.setScalar(new BigDecimal(stepValueTwo));
+
+    MetricStructuredName structuredNameTwo = new MetricStructuredName();
+    structuredNameTwo.setName(aggregatorName);
+    structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName));
+    updateTwo.setName(structuredNameTwo);
+
+    jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue));
+    assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, 
stepValueTwo));
+    assertThat(values.getValuesAtSteps().size(), equalTo(2));
+    assertThat(values.getValues(), containsInAnyOrder(stepValue, 
stepValueTwo));
+    assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue 
+ stepValueTwo)));
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, 
aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, 
?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, 
stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    JobMetrics jobMetrics = new JobMetrics();
+    when(getMetrics.execute()).thenReturn(jobMetrics);
+
+    MetricUpdate ignoredUpdate = new MetricUpdate();
+    ignoredUpdate.setScalar(null);
+
+    MetricStructuredName ignoredName = new MetricStructuredName();
+    ignoredName.setName("ignoredAggregator.elementCount.out0");
+    ignoredName.setContext(null);
+    ignoredUpdate.setName(ignoredName);
+
+    jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
+
+    assertThat(values.getValuesAtSteps().entrySet(), empty());
+    assertThat(values.getValues(), empty());
+  }
+
+  @Test
+  public void testGetAggregatorValuesWithUnusedAggregatorThrowsException()
+      throws AggregatorRetrievalException {
+    Aggregator<?, ?> aggregator = mock(Aggregator.class);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("not used in this pipeline");
+
+    job.getAggregatorValues(aggregator);
+  }
+
+  @Test
+  public void 
testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException()
+      throws IOException, AggregatorRetrievalException {
+    CombineFn<Long, long[], Long> combineFn = new Sum.SumLongFn();
+    String aggregatorName = "agg";
+    Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, 
aggregatorName);
+    @SuppressWarnings("unchecked")
+    PTransform<PInput, POutput> pTransform = mock(PTransform.class);
+    String stepName = "s1";
+    String fullName = "Foo/Bar/Baz";
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, 
pTransform);
+
+    DataflowAggregatorTransforms aggregatorTransforms = new 
DataflowAggregatorTransforms(
+        ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, 
?>>of(aggregator, pTransform).asMap(),
+        ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, 
stepName));
+
+    GetMetrics getMetrics = mock(GetMetrics.class);
+    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    IOException cause = new IOException();
+    when(getMetrics.execute()).thenThrow(cause);
+
+    Get getState = mock(Get.class);
+    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    Job modelJob = new Job();
+    when(getState.execute()).thenReturn(modelJob);
+    modelJob.setCurrentState(State.RUNNING.toString());
+
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, 
aggregatorTransforms);
+
+    thrown.expect(AggregatorRetrievalException.class);
+    thrown.expectCause(is(cause));
+    thrown.expectMessage(aggregator.toString());
+    thrown.expectMessage("when retrieving Aggregator values for");
+
+    job.getAggregatorValues(aggregator);
+  }
+
+  private static class TestAggregator<InT, OutT> implements Aggregator<InT, 
OutT> {
+    private final CombineFn<InT, ?, OutT> combineFn;
+    private final String name;
+
+    public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) {
+      this.combineFn = combineFn;
+      this.name = name;
+    }
+
+    @Override
+    public void addValue(InT value) {
+      throw new AssertionError();
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public CombineFn<InT, ?, OutT> getCombineFn() {
+      return combineFn;
+    }
+  }
+
+  private AppliedPTransform<?, ?, ?> appliedPTransform(
+      String fullName, PTransform<PInput, POutput> transform) {
+    return AppliedPTransform.of(fullName, mock(PInput.class), 
mock(POutput.class), transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java
new file mode 100644
index 0000000..4850939
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrarTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ServiceLoader;
+
+/** Tests for {@link DataflowPipelineRegistrar}. */
+@RunWith(JUnit4.class)
+public class DataflowPipelineRegistrarTest {
+  @Test
+  public void testCorrectOptionsAreReturned() {
+    assertEquals(ImmutableList.of(DataflowPipelineOptions.class,
+                                  BlockingDataflowPipelineOptions.class),
+        new DataflowPipelineRegistrar.Options().getPipelineOptions());
+  }
+
+  @Test
+  public void testCorrectRunnersAreReturned() {
+    assertEquals(ImmutableList.of(DataflowPipelineRunner.class,
+                                  BlockingDataflowPipelineRunner.class),
+        new DataflowPipelineRegistrar.Runner().getPipelineRunners());
+  }
+
+  @Test
+  public void testServiceLoaderForOptions() {
+    for (PipelineOptionsRegistrar registrar :
+        
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator()))
 {
+      if (registrar instanceof DataflowPipelineRegistrar.Options) {
+        return;
+      }
+    }
+    fail("Expected to find " + DataflowPipelineRegistrar.Options.class);
+  }
+
+  @Test
+  public void testServiceLoaderForRunner() {
+    for (PipelineRunnerRegistrar registrar :
+        
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator()))
 {
+      if (registrar instanceof DataflowPipelineRegistrar.Runner) {
+        return;
+      }
+    }
+    fail("Expected to find " + DataflowPipelineRegistrar.Runner.class);
+  }
+}

Reply via email to