http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
deleted file mode 100644
index 1c853bb..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Bucket;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
-import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
-import com.google.cloud.hadoop.util.ClientRequestHelper;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.AccessDeniedException;
-import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides operations on GCS.
- */
-public class GcsUtil {
-  /**
-   * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} 
using
-   * any transport flags specified on the {@link PipelineOptions}.
-   */
-  public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
-    /**
-     * Returns an instance of {@link GcsUtil} based on the
-     * {@link PipelineOptions}.
-     *
-     * <p>If no instance has previously been created, one is created and the 
value
-     * stored in {@code options}.
-     */
-    @Override
-    public GcsUtil create(PipelineOptions options) {
-      LOG.debug("Creating new GcsUtil");
-      GcsOptions gcsOptions = options.as(GcsOptions.class);
-      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
-      return new GcsUtil(
-          storageBuilder.build(),
-          storageBuilder.getHttpRequestInitializer(),
-          gcsOptions.getExecutorService(),
-          gcsOptions.getGcsUploadBufferSizeBytes());
-    }
-
-    /**
-     * Returns an instance of {@link GcsUtil} based on the given parameters.
-     */
-    public static GcsUtil create(
-        Storage storageClient,
-        HttpRequestInitializer httpRequestInitializer,
-        ExecutorService executorService,
-        @Nullable Integer uploadBufferSizeBytes) {
-      return new GcsUtil(
-          storageClient, httpRequestInitializer, executorService, 
uploadBufferSizeBytes);
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
-
-  /** Maximum number of items to retrieve per Objects.List request. */
-  private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
-
-  /** Matches a glob containing a wildcard, capturing the portion before the 
first wildcard. */
-  private static final Pattern GLOB_PREFIX = 
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
-
-  private static final String RECURSIVE_WILDCARD = "[*]{2}";
-
-  /**
-   * A {@link Pattern} for globs with a recursive wildcard.
-   */
-  private static final Pattern RECURSIVE_GCS_PATTERN =
-      Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
-
-  /**
-   * Maximum number of requests permitted in a GCS batch request.
-   */
-  private static final int MAX_REQUESTS_PER_BATCH = 100;
-  /**
-   * Maximum number of concurrent batches of requests executing on GCS.
-   */
-  private static final int MAX_CONCURRENT_BATCHES = 256;
-
-  private static final FluentBackoff BACKOFF_FACTORY =
-      
FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Client for the GCS API. */
-  private Storage storageClient;
-  private final HttpRequestInitializer httpRequestInitializer;
-  /** Buffer size for GCS uploads (in bytes). */
-  @Nullable private final Integer uploadBufferSizeBytes;
-
-  // Helper delegate for turning IOExceptions from API calls into higher-level 
semantics.
-  private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-
-  // Exposed for testing.
-  final ExecutorService executorService;
-
-  /**
-   * Returns true if the given GCS pattern is supported otherwise fails with an
-   * exception.
-   */
-  public static boolean isGcsPatternSupported(String gcsPattern) {
-    if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
-      throw new IllegalArgumentException("Unsupported wildcard usage in \"" + 
gcsPattern + "\": "
-          + " recursive wildcards are not supported.");
-    }
-    return true;
-  }
-
-  /**
-   * Returns the prefix portion of the glob that doesn't contain wildcards.
-   */
-  public static String getGlobPrefix(String globExp) {
-    checkArgument(isGcsPatternSupported(globExp));
-    Matcher m = GLOB_PREFIX.matcher(globExp);
-    checkArgument(
-        m.matches(),
-        String.format("Glob expression: [%s] is not expandable.", globExp));
-    return m.group("PREFIX");
-  }
-
-  /**
-   * Expands glob expressions to regular expressions.
-   *
-   * @param globExp the glob expression to expand
-   * @return a string with the regular expression this glob expands to
-   */
-  public static String globToRegexp(String globExp) {
-    StringBuilder dst = new StringBuilder();
-    char[] src = globExp.toCharArray();
-    int i = 0;
-    while (i < src.length) {
-      char c = src[i++];
-      switch (c) {
-        case '*':
-          dst.append("[^/]*");
-          break;
-        case '?':
-          dst.append("[^/]");
-          break;
-        case '.':
-        case '+':
-        case '{':
-        case '}':
-        case '(':
-        case ')':
-        case '|':
-        case '^':
-        case '$':
-          // These need to be escaped in regular expressions
-          dst.append('\\').append(c);
-          break;
-        case '\\':
-          i = doubleSlashes(dst, src, i);
-          break;
-        default:
-          dst.append(c);
-          break;
-      }
-    }
-    return dst.toString();
-  }
-
-  /**
-   * Returns true if the given {@code spec} contains glob.
-   */
-  public static boolean isGlob(GcsPath spec) {
-    return GLOB_PREFIX.matcher(spec.getObject()).matches();
-  }
-
-  private GcsUtil(
-      Storage storageClient,
-      HttpRequestInitializer httpRequestInitializer,
-      ExecutorService executorService,
-      @Nullable Integer uploadBufferSizeBytes) {
-    this.storageClient = storageClient;
-    this.httpRequestInitializer = httpRequestInitializer;
-    this.uploadBufferSizeBytes = uploadBufferSizeBytes;
-    this.executorService = executorService;
-  }
-
-  // Use this only for testing purposes.
-  protected void setStorageClient(Storage storageClient) {
-    this.storageClient = storageClient;
-  }
-
-  /**
-   * Expands a pattern into matched paths. The pattern path may contain globs, 
which are expanded
-   * in the result. For patterns that only match a single object, we ensure 
that the object
-   * exists.
-   */
-  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
-    checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
-    Pattern p = null;
-    String prefix = null;
-    if (!isGlob(gcsPattern)) {
-      // Not a glob.
-      try {
-        // Use a get request to fetch the metadata of the object, and ignore 
the return value.
-        // The request has strong global consistency.
-        getObject(gcsPattern);
-        return ImmutableList.of(gcsPattern);
-      } catch (FileNotFoundException e) {
-        // If the path was not found, return an empty list.
-        return ImmutableList.of();
-      }
-    } else {
-      // Part before the first wildcard character.
-      prefix = getGlobPrefix(gcsPattern.getObject());
-      p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
-    }
-
-    LOG.debug("matching files in bucket {}, prefix {} against pattern {}", 
gcsPattern.getBucket(),
-        prefix, p.toString());
-
-    String pageToken = null;
-    List<GcsPath> results = new LinkedList<>();
-    do {
-      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
-      if (objects.getItems() == null) {
-        break;
-      }
-
-      // Filter objects based on the regex.
-      for (StorageObject o : objects.getItems()) {
-        String name = o.getName();
-        // Skip directories, which end with a slash.
-        if (p.matcher(name).matches() && !name.endsWith("/")) {
-          LOG.debug("Matched object: {}", name);
-          results.add(GcsPath.fromObject(o));
-        }
-      }
-      pageToken = objects.getNextPageToken();
-    } while (pageToken != null);
-
-    return results;
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Integer getUploadBufferSizeBytes() {
-    return uploadBufferSizeBytes;
-  }
-
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException}
-   * if the resource does not exist.
-   */
-  public long fileSize(GcsPath path) throws IOException {
-    return getObject(path).getSize().longValue();
-  }
-
-  /**
-   * Returns the {@link StorageObject} for the given {@link GcsPath}.
-   */
-  public StorageObject getObject(GcsPath gcsPath) throws IOException {
-    return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
-  }
-
-  @VisibleForTesting
-  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) 
throws IOException {
-    Storage.Objects.Get getObject =
-        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
-    try {
-      return ResilientOperation.retry(
-          ResilientOperation.getGoogleRequestCallable(getObject),
-          backoff,
-          RetryDeterminer.SOCKET_ERRORS,
-          IOException.class,
-          sleeper);
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
-        throw new FileNotFoundException(gcsPath.toString());
-      }
-      throw new IOException(
-          String.format("Unable to get the file object for path %s.", gcsPath),
-          e);
-    }
-  }
-
-  /**
-   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} 
for the given
-   * {@link GcsPath GcsPaths}.
-   */
-  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
-      throws IOException {
-    List<StorageObjectOrIOException[]> results = new ArrayList<>();
-    executeBatches(makeGetBatches(gcsPaths, results));
-    ImmutableList.Builder<StorageObjectOrIOException> ret = 
ImmutableList.builder();
-    for (StorageObjectOrIOException[] result : results) {
-      ret.add(result[0]);
-    }
-    return ret.build();
-  }
-
-  /**
-   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code 
pageToken}.
-   */
-  public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
-      throws IOException {
-    // List all objects that start with the prefix (including objects in 
sub-directories).
-    Storage.Objects.List listObject = storageClient.objects().list(bucket);
-    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
-    listObject.setPrefix(prefix);
-
-    if (pageToken != null) {
-      listObject.setPageToken(pageToken);
-    }
-
-    try {
-      return ResilientOperation.retry(
-          ResilientOperation.getGoogleRequestCallable(listObject),
-          BACKOFF_FACTORY.backoff(),
-          RetryDeterminer.SOCKET_ERRORS,
-          IOException.class);
-    } catch (Exception e) {
-      throw new IOException(
-          String.format("Unable to match files in bucket %s, prefix %s.", 
bucket, prefix),
-          e);
-    }
-  }
-
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException}
-   * if the resource does not exist.
-   */
-  @VisibleForTesting
-  List<Long> fileSizes(List<GcsPath> paths) throws IOException {
-    List<StorageObjectOrIOException> results = getObjects(paths);
-
-    ImmutableList.Builder<Long> ret = ImmutableList.builder();
-    for (StorageObjectOrIOException result : results) {
-      ret.add(toFileSize(result));
-    }
-    return ret.build();
-  }
-
-  private Long toFileSize(StorageObjectOrIOException 
storageObjectOrIOException)
-      throws IOException {
-    if (storageObjectOrIOException.ioException() != null) {
-      throw storageObjectOrIOException.ioException();
-    } else {
-      return storageObjectOrIOException.storageObject().getSize().longValue();
-    }
-  }
-
-  /**
-   * Opens an object in GCS.
-   *
-   * <p>Returns a SeekableByteChannel that provides access to data in the 
bucket.
-   *
-   * @param path the GCS filename to read from
-   * @return a SeekableByteChannel that can read the object data
-   */
-  public SeekableByteChannel open(GcsPath path)
-      throws IOException {
-    return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(),
-            path.getObject(), errorExtractor,
-            new ClientRequestHelper<StorageObject>());
-  }
-
-  /**
-   * Creates an object in GCS.
-   *
-   * <p>Returns a WritableByteChannel that can be used to write data to the
-   * object.
-   *
-   * @param path the GCS file to write to
-   * @param type the type of object, eg "text/plain".
-   * @return a Callable object that encloses the operation.
-   */
-  public WritableByteChannel create(GcsPath path,
-      String type) throws IOException {
-    GoogleCloudStorageWriteChannel channel = new 
GoogleCloudStorageWriteChannel(
-        executorService,
-        storageClient,
-        new ClientRequestHelper<StorageObject>(),
-        path.getBucket(),
-        path.getObject(),
-        AsyncWriteChannelOptions.newBuilder().build(),
-        new ObjectWriteConditions(),
-        Collections.<String, String>emptyMap(),
-        type);
-    if (uploadBufferSizeBytes != null) {
-      channel.setUploadBufferSize(uploadBufferSizeBytes);
-    }
-    channel.initialize();
-    return channel;
-  }
-
-  /**
-   * Returns whether the GCS bucket exists and is accessible.
-   */
-  public boolean bucketAccessible(GcsPath path) throws IOException {
-    return bucketAccessible(
-        path,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT);
-  }
-
-  /**
-   * Returns the project number of the project which owns this bucket.
-   * If the bucket exists, it must be accessible otherwise the permissions
-   * exception will be propagated.  If the bucket does not exist, an exception
-   * will be thrown.
-   */
-  public long bucketOwner(GcsPath path) throws IOException {
-    return getBucket(
-        path,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT).getProjectNumber().longValue();
-  }
-
-  /**
-   * Creates a {@link Bucket} under the specified project in Cloud Storage or
-   * propagates an exception.
-   */
-  public void createBucket(String projectId, Bucket bucket) throws IOException 
{
-    createBucket(
-        projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
-  }
-
-  /**
-   * Returns whether the GCS bucket exists. This will return false if the 
bucket
-   * is inaccessible due to permissions.
-   */
-  @VisibleForTesting
-  boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) 
throws IOException {
-    try {
-      return getBucket(path, backoff, sleeper) != null;
-    } catch (AccessDeniedException | FileNotFoundException e) {
-      return false;
-    }
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws 
IOException {
-    Storage.Buckets.Get getBucket =
-        storageClient.buckets().get(path.getBucket());
-
-      try {
-        Bucket bucket = ResilientOperation.retry(
-            ResilientOperation.getGoogleRequestCallable(getBucket),
-            backoff,
-            new RetryDeterminer<IOException>() {
-              @Override
-              public boolean shouldRetry(IOException e) {
-                if (errorExtractor.itemNotFound(e) || 
errorExtractor.accessDenied(e)) {
-                  return false;
-                }
-                return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
-              }
-            },
-            IOException.class,
-            sleeper);
-
-        return bucket;
-      } catch (GoogleJsonResponseException e) {
-        if (errorExtractor.accessDenied(e)) {
-          throw new AccessDeniedException(path.toString(), null, 
e.getMessage());
-        }
-        if (errorExtractor.itemNotFound(e)) {
-          throw new FileNotFoundException(e.getMessage());
-        }
-        throw e;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(
-            String.format("Error while attempting to verify existence of 
bucket gs://%s",
-                path.getBucket()), e);
-     }
-  }
-
-  @VisibleForTesting
-  void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper 
sleeper)
-        throws IOException {
-    Storage.Buckets.Insert insertBucket =
-        storageClient.buckets().insert(projectId, bucket);
-    insertBucket.setPredefinedAcl("projectPrivate");
-    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
-
-    try {
-      ResilientOperation.retry(
-        ResilientOperation.getGoogleRequestCallable(insertBucket),
-        backoff,
-        new RetryDeterminer<IOException>() {
-          @Override
-          public boolean shouldRetry(IOException e) {
-            if (errorExtractor.itemAlreadyExists(e) || 
errorExtractor.accessDenied(e)) {
-              return false;
-            }
-            return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
-          }
-        },
-        IOException.class,
-        sleeper);
-      return;
-    } catch (GoogleJsonResponseException e) {
-      if (errorExtractor.accessDenied(e)) {
-        throw new AccessDeniedException(bucket.getName(), null, 
e.getMessage());
-      }
-      if (errorExtractor.itemAlreadyExists(e)) {
-        throw new FileAlreadyExistsException(bucket.getName(), null, 
e.getMessage());
-      }
-      throw e;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-        String.format("Error while attempting to create bucket gs://%s for 
rproject %s",
-                      bucket.getName(), projectId), e);
-    }
-  }
-
-  private static void executeBatches(List<BatchRequest> batches) throws 
IOException {
-    ListeningExecutorService executor = MoreExecutors.listeningDecorator(
-        MoreExecutors.getExitingExecutorService(
-            new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, 
MAX_CONCURRENT_BATCHES,
-                0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>())));
-
-    List<ListenableFuture<Void>> futures = new LinkedList<>();
-    for (final BatchRequest batch : batches) {
-      futures.add(executor.submit(new Callable<Void>() {
-        public Void call() throws IOException {
-          batch.execute();
-          return null;
-        }
-      }));
-    }
-
-    try {
-      Futures.allAsList(futures).get();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while executing batch GCS request", 
e);
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof FileNotFoundException) {
-        throw (FileNotFoundException) e.getCause();
-      }
-      throw new IOException("Error executing batch GCS request", e);
-    } finally {
-      executor.shutdown();
-    }
-  }
-
-  /**
-   * Makes get {@link BatchRequest BatchRequests}.
-   *
-   * @param paths {@link GcsPath GcsPaths}.
-   * @param results mutable {@link List} for return values.
-   * @return {@link BatchRequest BatchRequests} to execute.
-   * @throws IOException
-   */
-  @VisibleForTesting
-  List<BatchRequest> makeGetBatches(
-      Collection<GcsPath> paths,
-      List<StorageObjectOrIOException[]> results) throws IOException {
-    List<BatchRequest> batches = new LinkedList<>();
-    for (List<GcsPath> filesToGet :
-        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = createBatchRequest();
-      for (GcsPath path : filesToGet) {
-        results.add(enqueueGetFileSize(path, batch));
-      }
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  public void copy(Iterable<String> srcFilenames,
-                   Iterable<String> destFilenames) throws
-      IOException {
-    executeBatches(makeCopyBatches(srcFilenames, destFilenames));
-  }
-
-  List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, 
Iterable<String> destFilenames)
-      throws IOException {
-    List<String> srcList = Lists.newArrayList(srcFilenames);
-    List<String> destList = Lists.newArrayList(destFilenames);
-    checkArgument(
-        srcList.size() == destList.size(),
-        "Number of source files %s must equal number of destination files %s",
-        srcList.size(),
-        destList.size());
-
-    List<BatchRequest> batches = new LinkedList<>();
-    BatchRequest batch = createBatchRequest();
-    for (int i = 0; i < srcList.size(); i++) {
-      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
-      final GcsPath destPath = GcsPath.fromUri(destList.get(i));
-      enqueueCopy(sourcePath, destPath, batch);
-      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
-        batches.add(batch);
-        batch = createBatchRequest();
-      }
-    }
-    if (batch.size() > 0) {
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws 
IOException {
-    List<BatchRequest> batches = new LinkedList<>();
-    for (List<String> filesToDelete :
-        Lists.partition(Lists.newArrayList(filenames), 
MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = createBatchRequest();
-      for (String file : filesToDelete) {
-        enqueueDelete(GcsPath.fromUri(file), batch);
-      }
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  public void remove(Collection<String> filenames) throws IOException {
-    executeBatches(makeRemoveBatches(filenames));
-  }
-
-  private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, 
BatchRequest batch)
-      throws IOException {
-    final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];
-
-    Storage.Objects.Get getRequest = storageClient.objects()
-        .get(path.getBucket(), path.getObject());
-    getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
-      @Override
-      public void onSuccess(StorageObject response, HttpHeaders httpHeaders) 
throws IOException {
-        ret[0] = StorageObjectOrIOException.create(response);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws 
IOException {
-        IOException ioException;
-        if (errorExtractor.itemNotFound(e)) {
-          ioException = new FileNotFoundException(path.toString());
-        } else {
-          ioException = new IOException(String.format("Error trying to get %s: 
%s", path, e));
-        }
-        ret[0] = StorageObjectOrIOException.create(ioException);
-      }
-    });
-    return ret;
-  }
-
-  /**
-   * A class that holds either a {@link StorageObject} or an {@link 
IOException}.
-   */
-  @AutoValue
-  public abstract static class StorageObjectOrIOException {
-
-    /**
-     * Returns the {@link StorageObject}.
-     */
-    @Nullable
-    public abstract StorageObject storageObject();
-
-    /**
-     * Returns the {@link IOException}.
-     */
-    @Nullable
-    public abstract IOException ioException();
-
-    @VisibleForTesting
-    public static StorageObjectOrIOException create(StorageObject 
storageObject) {
-      return new AutoValue_GcsUtil_StorageObjectOrIOException(
-          checkNotNull(storageObject, "storageObject"),
-          null /* ioException */);
-    }
-
-    @VisibleForTesting
-    public static StorageObjectOrIOException create(IOException ioException) {
-      return new AutoValue_GcsUtil_StorageObjectOrIOException(
-          null /* storageObject */,
-          checkNotNull(ioException, "ioException"));
-    }
-  }
-
-  private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest 
batch)
-      throws IOException {
-    Storage.Objects.Copy copyRequest = storageClient.objects()
-        .copy(from.getBucket(), from.getObject(), to.getBucket(), 
to.getObject(), null);
-    copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
-      @Override
-      public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
-        LOG.debug("Successfully copied {} to {}", from, to);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-        throw new IOException(
-            String.format("Error trying to copy %s to %s: %s", from, to, e));
-      }
-    });
-  }
-
-  private void enqueueDelete(final GcsPath file, BatchRequest batch) throws 
IOException {
-    Storage.Objects.Delete deleteRequest = storageClient.objects()
-        .delete(file.getBucket(), file.getObject());
-    deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
-      @Override
-      public void onSuccess(Void obj, HttpHeaders responseHeaders) {
-        LOG.debug("Successfully deleted {}", file);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-        throw new IOException(String.format("Error trying to delete %s: %s", 
file, e));
-      }
-    });
-  }
-
-  private BatchRequest createBatchRequest() {
-    return storageClient.batch(httpRequestInitializer);
-  }
-
-  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
-    // Emit the next character without special interpretation
-    dst.append('\\');
-    if ((i - 1) != src.length) {
-      dst.append(src[i]);
-      i++;
-    } else {
-      // A backslash at the very end is treated like an escaped backslash
-      dst.append('\\');
-    }
-    return i;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
deleted file mode 100644
index 6fac6dc..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-
-
-/**
- * Implementation of {@link BackOff} that increases the back off period for 
each retry attempt
- * using a randomization function that grows exponentially.
- *
- * <p>Example: The initial interval is .5 seconds and the maximum interval is 
60 secs.
- * For 14 tries the sequence will be (values in seconds):
- *
- * <pre>
- * retry#      retry_interval     randomized_interval
- * 1             0.5                [0.25,   0.75]
- * 2             0.75               [0.375,  1.125]
- * 3             1.125              [0.562,  1.687]
- * 4             1.687              [0.8435, 2.53]
- * 5             2.53               [1.265,  3.795]
- * 6             3.795              [1.897,  5.692]
- * 7             5.692              [2.846,  8.538]
- * 8             8.538              [4.269, 12.807]
- * 9            12.807              [6.403, 19.210]
- * 10           28.832              [14.416, 43.248]
- * 11           43.248              [21.624, 64.873]
- * 12           60.0                [30.0, 90.0]
- * 13           60.0                [30.0, 90.0]
- * 14           60.0                [30.0, 90.0]
- * </pre>
- *
- * <p>Implementation is not thread-safe.
- */
-@Deprecated
-public class IntervalBoundedExponentialBackOff implements BackOff {
-  public static final double DEFAULT_MULTIPLIER = 1.5;
-  public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
-  private final long maximumIntervalMillis;
-  private final long initialIntervalMillis;
-  private int currentAttempt;
-
-  public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long 
initialIntervalMillis) {
-    checkArgument(maximumIntervalMillis > 0, "Maximum interval must be greater 
than zero.");
-    checkArgument(initialIntervalMillis > 0, "Initial interval must be greater 
than zero.");
-    this.maximumIntervalMillis = maximumIntervalMillis;
-    this.initialIntervalMillis = initialIntervalMillis;
-    reset();
-  }
-
-  @Override
-  public void reset() {
-    currentAttempt = 1;
-  }
-
-  @Override
-  public long nextBackOffMillis() {
-    double currentIntervalMillis =
-        Math.min(
-            initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, 
currentAttempt - 1),
-            maximumIntervalMillis);
-    double randomOffset =
-        (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * 
currentIntervalMillis;
-    currentAttempt += 1;
-    return Math.round(currentIntervalMillis + randomOffset);
-  }
-
-  public boolean atMaxInterval() {
-    return initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt 
- 1)
-        >= maximumIntervalMillis;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
deleted file mode 100644
index f703e4c..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.auth.Credentials;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- * Always returns a null Credential object.
- */
-public class NoopCredentialFactory implements CredentialFactory {
-  private static final NoopCredentialFactory INSTANCE = new 
NoopCredentialFactory();
-  private static final NoopCredentials NOOP_CREDENTIALS = new 
NoopCredentials();
-
-  public static NoopCredentialFactory fromOptions(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  @Override
-  public Credentials getCredential() throws IOException {
-    return NOOP_CREDENTIALS;
-  }
-
-  private static class NoopCredentials extends Credentials {
-    @Override
-    public String getAuthenticationType() {
-      return null;
-    }
-
-    @Override
-    public Map<String, List<String>> getRequestMetadata(URI uri) throws 
IOException {
-      return null;
-    }
-
-    @Override
-    public boolean hasRequestMetadata() {
-      return false;
-    }
-
-    @Override
-    public boolean hasRequestMetadataOnly() {
-      return false;
-    }
-
-    @Override
-    public void refresh() throws IOException {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
deleted file mode 100644
index 4ed35c6..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
-import java.io.IOException;
-
-/**
- * A {@link HttpRequestInitializer} for requests that don't have credentials.
- *
- * <p>When the access is denied, it throws {@link IOException} with a detailed 
error message.
- */
-public class NullCredentialInitializer implements HttpRequestInitializer {
-  private static final int ACCESS_DENIED = 401;
-  private static final String NULL_CREDENTIAL_REASON =
-      "Unable to get application default credentials. Please see "
-      + 
"https://developers.google.com/accounts/docs/application-default-credentials "
-      + "for details on how to specify credentials. This version of the SDK is 
"
-      + "dependent on the gcloud core component version 2015.02.05 or newer to 
"
-      + "be able to get credentials from the currently authorized user via 
gcloud auth.";
-
-  @Override
-  public void initialize(HttpRequest httpRequest) throws IOException {
-    httpRequest.setUnsuccessfulResponseHandler(new 
NullCredentialHttpUnsuccessfulResponseHandler());
-  }
-
-  private static class NullCredentialHttpUnsuccessfulResponseHandler
-      implements HttpUnsuccessfulResponseHandler {
-
-    @Override
-    public boolean handleResponse(
-        HttpRequest httpRequest,
-        HttpResponse httpResponse, boolean supportsRetry) throws IOException {
-      if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() 
== ACCESS_DENIED) {
-        throwNullCredentialException();
-      }
-      return supportsRetry;
-    }
-  }
-
-  public static void throwNullCredentialException() {
-    throw new RuntimeException(NULL_CREDENTIAL_REASON);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
deleted file mode 100644
index 80c093b..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.storage.Storage;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * Helpers for cloud communication.
- */
-public class Transport {
-
-  private static class SingletonHelper {
-    /** Global instance of the JSON factory. */
-    private static final JsonFactory JSON_FACTORY;
-
-    /** Global instance of the HTTP transport. */
-    private static final HttpTransport HTTP_TRANSPORT;
-
-    static {
-      try {
-        JSON_FACTORY = JacksonFactory.getDefaultInstance();
-        HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
-      } catch (GeneralSecurityException | IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  public static HttpTransport getTransport() {
-    return SingletonHelper.HTTP_TRANSPORT;
-  }
-
-  public static JsonFactory getJsonFactory() {
-    return SingletonHelper.JSON_FACTORY;
-  }
-
-  private static class ApiComponents {
-    public String rootUrl;
-    public String servicePath;
-
-    public ApiComponents(String root, String path) {
-      this.rootUrl = root;
-      this.servicePath = path;
-    }
-  }
-
-  private static ApiComponents apiComponentsFromUrl(String urlString) {
-    try {
-      URL url = new URL(urlString);
-      String rootUrl = url.getProtocol() + "://" + url.getHost()
-          + (url.getPort() > 0 ? ":" + url.getPort() : "");
-      return new ApiComponents(rootUrl, url.getPath());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Invalid URL: " + urlString);
-    }
-  }
-
-  /**
-   * Returns a BigQuery client builder using the specified {@link 
BigQueryOptions}.
-   */
-  public static Bigquery.Builder
-      newBigQueryClient(BigQueryOptions options) {
-    return new Bigquery.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even 
required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
-   *
-   * @deprecated Use an appropriate 
org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
-   */
-  @Deprecated
-  public static Pubsub.Builder
-      newPubsubClient(PubsubOptions options) {
-    return new Pubsub.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even 
required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setRootUrl(options.getPubsubRootUrl())
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a CloudResourceManager client builder using the specified
-   * {@link CloudResourceManagerOptions}.
-   */
-  public static CloudResourceManager.Builder
-      newCloudResourceManagerClient(CloudResourceManagerOptions options) {
-    Credentials credentials = options.getGcpCredential();
-    if (credentials == null) {
-      NullCredentialInitializer.throwNullCredentialException();
-    }
-    return new CloudResourceManager.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            credentials,
-            // Do not log 404. It clutters the output and is possibly even 
required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Cloud Storage client builder using the specified {@link 
GcsOptions}.
-   */
-  public static Storage.Builder
-      newStorageClient(GcsOptions options) {
-    String servicePath = options.getGcsEndpoint();
-    Storage.Builder storageBuilder = new Storage.Builder(getTransport(), 
getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log the code 404. Code up the stack will deal with 404's 
if needed, and
-            // logging it by default clutters the output during file staging.
-            new RetryHttpRequestInitializer(
-                ImmutableList.of(404), new UploadIdResponseInterceptor())))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-    if (servicePath != null) {
-      ApiComponents components = apiComponentsFromUrl(servicePath);
-      storageBuilder.setRootUrl(components.rootUrl);
-      storageBuilder.setServicePath(components.servicePath);
-    }
-    return storageBuilder;
-  }
-
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
-    if (credential == null) {
-      return new ChainingHttpRequestInitializer(
-          new NullCredentialInitializer(), httpRequestInitializer);
-    } else {
-      return new ChainingHttpRequestInitializer(
-          new HttpCredentialsAdapter(credential),
-          httpRequestInitializer);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index c4b3a9f..66c1393 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -40,8 +40,6 @@ public class SdkCoreApiSurfaceTest {
             "org.apache.beam",
             "com.google.api.client",
             "com.google.api.services.bigquery",
-            "com.google.api.services.cloudresourcemanager",
-            "com.google.api.services.pubsub",
             "com.google.api.services.storage",
             "com.google.auth",
             "com.google.protobuf",

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 3b6992a..a8655a6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -48,7 +48,6 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -56,14 +55,11 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -72,17 +68,13 @@ import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-
 import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.TextIO.TextSource;
-import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -96,9 +88,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
@@ -111,9 +101,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 /**
  * Tests for TextIO Read and Write transforms.
@@ -416,7 +403,7 @@ public class TextIOTest {
   }
 
   private static Predicate<List<String>> haveProperHeaderAndFooter(final 
String header,
-                                                                   final 
String footer) {
+      final String footer) {
     return new Predicate<List<String>>() {
       @Override
       public boolean apply(List<String> fileLines) {
@@ -577,21 +564,6 @@ public class TextIOTest {
     input.apply(TextIO.Write.to(filename));
   }
 
-  /**
-   * Recursive wildcards are not supported.
-   * This tests "**".
-   */
-  @Test
-  public void testBadWildcardRecursive() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
-    // Check that applying does fail.
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("wildcard");
-
-    p.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
-  }
-
   /** Options for testing. */
   public interface RuntimeTestOptions extends PipelineOptions {
     ValueProvider<String> getInput();
@@ -1186,70 +1158,5 @@ public class TextIOTest {
     assertThat(splits, hasSize(equalTo(1)));
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
-
-  
///////////////////////////////////////////////////////////////////////////////////////////////
-  // Test "gs://" paths
-
-  private GcsUtil buildMockGcsUtil() throws IOException {
-    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
-    // Any request to open gets a new bogus channel
-    Mockito
-        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
-
-    // Any request for expansion returns a list containing the original GcsPath
-    // This is required to pass validation that occurs in TextIO during apply()
-    Mockito
-        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
-        .then(new Answer<List<GcsPath>>() {
-          @Override
-          public List<GcsPath> answer(InvocationOnMock invocation) throws 
Throwable {
-            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-          }
-        });
-
-    return mockGcsUtil;
-  }
-
-  /**
-   * This tests a few corner cases that should not crash.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testGoodWildcards() throws Exception {
-    GcsOptions options = 
TestPipeline.testingPipelineOptions().as(GcsOptions.class);
-    options.setGcsUtil(buildMockGcsUtil());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    applyRead(pipeline, "gs://bucket/foo");
-    applyRead(pipeline, "gs://bucket/foo/");
-    applyRead(pipeline, "gs://bucket/foo/*");
-    applyRead(pipeline, "gs://bucket/foo/?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]");
-    applyRead(pipeline, "gs://bucket/foo/*baz*");
-    applyRead(pipeline, "gs://bucket/foo/*baz?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
-    applyRead(pipeline, "gs://bucket/foo/baz/*");
-    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
-    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
-    applyRead(pipeline, "gs://bucket/foo*/baz");
-    applyRead(pipeline, "gs://bucket/foo?/baz");
-    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
-    // Check that running doesn't fail.
-    pipeline.run();
-  }
-
-  private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
-  }
 }
+

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
deleted file mode 100644
index 288383e..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link GcpOptions}. */
-@RunWith(JUnit4.class)
-public class GcpOptionsTest {
-  @Rule public TestRule restoreSystemProperties = new 
RestoreSystemProperties();
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
-    Map<String, String> environment =
-        ImmutableMap.of("CLOUDSDK_CONFIG", 
tmpFolder.getRoot().getAbsolutePath());
-    assertEquals("test-project",
-        runGetProjectTest(tmpFolder.newFile("properties"), environment));
-  }
-
-  @Test
-  public void testGetProjectFromAppDataEnv() throws Exception {
-    Map<String, String> environment =
-        ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
-    System.setProperty("os.name", "windows");
-    assertEquals("test-project",
-        runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), 
"properties"),
-            environment));
-  }
-
-  @Test
-  public void testGetProjectFromUserHomeEnvOld() throws Exception {
-    Map<String, String> environment = ImmutableMap.of();
-    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-    assertEquals("test-project",
-        runGetProjectTest(
-            new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
-            environment));
-  }
-
-  @Test
-  public void testGetProjectFromUserHomeEnv() throws Exception {
-    Map<String, String> environment = ImmutableMap.of();
-    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-    assertEquals("test-project",
-        runGetProjectTest(
-            new File(tmpFolder.newFolder(".config", "gcloud", 
"configurations"), "config_default"),
-            environment));
-  }
-
-  @Test
-  public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception 
{
-    Map<String, String> environment = ImmutableMap.of();
-    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-    makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", 
"gcloud"), "properties"),
-        "old-project");
-    assertEquals("test-project",
-        runGetProjectTest(
-            new File(tmpFolder.newFolder(".config", "gcloud", 
"configurations"), "config_default"),
-            environment));
-  }
-
-  @Test
-  public void testUnableToGetDefaultProject() throws Exception {
-    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-    DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
-    when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, 
String>of());
-    assertNull(projectFactory.create(PipelineOptionsFactory.create()));
-  }
-
-  @Test
-  public void testEmptyGcpTempLocation() throws Exception {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    options.setProject("");
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("--project is a required option");
-    options.getGcpTempLocation();
-  }
-
-  @Test
-  public void testDefaultGcpTempLocation() throws Exception {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    String tempLocation = "gs://bucket";
-    options.setTempLocation(tempLocation);
-    
options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
-    assertEquals(tempLocation, options.getGcpTempLocation());
-  }
-
-  @Test
-  public void testDefaultGcpTempLocationInvalid() throws Exception {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    options.setTempLocation("file://");
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Error constructing default value for gcpTempLocation: tempLocation is 
not"
-            + " a valid GCS path");
-    options.getGcpTempLocation();
-  }
-
-  @Test
-  public void testDefaultGcpTempLocationDoesNotExist() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    String tempLocation = "gs://does/not/exist";
-    options.setTempLocation(tempLocation);
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Error constructing default value for gcpTempLocation: tempLocation is 
not"
-            + " a valid GCS path");
-    thrown.expectCause(
-        hasMessage(containsString("Output path does not exist or is not 
writeable")));
-
-    options.getGcpTempLocation();
-  }
-
-  private static void makePropertiesFileWithProject(File path, String 
projectId)
-      throws IOException {
-    String properties = String.format("[core]%n"
-        + "account = [email protected]%n"
-        + "project = %s%n"
-        + "%n"
-        + "[dataflow]%n"
-        + "magic = true%n", projectId);
-    Files.write(properties, path, StandardCharsets.UTF_8);
-  }
-
-  private static String runGetProjectTest(File path, Map<String, String> 
environment)
-      throws Exception {
-    makePropertiesFileWithProject(path, "test-project");
-    DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
-    when(projectFactory.getEnvironment()).thenReturn(environment);
-    return projectFactory.create(PipelineOptionsFactory.create());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
deleted file mode 100644
index dae7208..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
-import com.google.api.services.storage.Storage;
-import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.Transport;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link GoogleApiDebugOptions}. */
-@RunWith(JUnit4.class)
-public class GoogleApiDebugOptionsTest {
-  private static final String STORAGE_GET_TRACE =
-      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
-  private static final String STORAGE_GET_AND_LIST_TRACE =
-      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
-      + "\"Objects.List\":\"ListTraceDestination\"}";
-  private static final String STORAGE_TRACE = 
"--googleApiTrace={\"Storage\":\"TraceDestination\"}";
-
-  @Test
-  public void testWhenTracingMatches() throws Exception {
-    String[] args = new String[] {STORAGE_GET_TRACE};
-    GcsOptions options = 
PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-    assertNotNull(options.getGoogleApiTrace());
-
-    Storage.Objects.Get request =
-        
Transport.newStorageClient(options).build().objects().get("testBucketId", 
"testObjectId");
-    assertEquals("GetTraceDestination", request.get("$trace"));
-  }
-
-  @Test
-  public void testWhenTracingDoesNotMatch() throws Exception {
-    String[] args = new String[] {STORAGE_GET_TRACE};
-    GcsOptions options = 
PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-
-    assertNotNull(options.getGoogleApiTrace());
-
-    Storage.Objects.List request =
-        
Transport.newStorageClient(options).build().objects().list("testProjectId");
-    assertNull(request.get("$trace"));
-  }
-
-  @Test
-  public void testWithMultipleTraces() throws Exception {
-    String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
-    GcsOptions options = 
PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-
-    assertNotNull(options.getGoogleApiTrace());
-
-    Storage.Objects.Get getRequest =
-        
Transport.newStorageClient(options).build().objects().get("testBucketId", 
"testObjectId");
-    assertEquals("GetTraceDestination", getRequest.get("$trace"));
-
-    Storage.Objects.List listRequest =
-        
Transport.newStorageClient(options).build().objects().list("testProjectId");
-    assertEquals("ListTraceDestination", listRequest.get("$trace"));
-  }
-
-  @Test
-  public void testMatchingAllCalls() throws Exception {
-    String[] args = new String[] {STORAGE_TRACE};
-    GcsOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-
-    assertNotNull(options.getGoogleApiTrace());
-
-    Storage.Objects.Get getRequest =
-        
Transport.newStorageClient(options).build().objects().get("testBucketId", 
"testObjectId");
-    assertEquals("TraceDestination", getRequest.get("$trace"));
-
-    Storage.Objects.List listRequest =
-        
Transport.newStorageClient(options).build().objects().list("testProjectId");
-    assertEquals("TraceDestination", listRequest.get("$trace"));
-  }
-
-  @Test
-  public void testMatchingAgainstClient() throws Exception {
-    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
-        Transport.newStorageClient(options).build(), "TraceDestination"));
-
-    Storage.Objects.Get getRequest =
-        
Transport.newStorageClient(options).build().objects().get("testBucketId", 
"testObjectId");
-    assertEquals("TraceDestination", getRequest.get("$trace"));
-
-    Delete deleteRequest = 
Transport.newBigQueryClient(options.as(BigQueryOptions.class))
-        .build().datasets().delete("testProjectId", "testDatasetId");
-    assertNull(deleteRequest.get("$trace"));
-  }
-
-  @Test
-  public void testMatchingAgainstRequestType() throws Exception {
-    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
-        Transport.newStorageClient(options).build().objects()
-            .get("aProjectId", "aObjectId"), "TraceDestination"));
-
-    Storage.Objects.Get getRequest =
-        
Transport.newStorageClient(options).build().objects().get("testBucketId", 
"testObjectId");
-    assertEquals("TraceDestination", getRequest.get("$trace"));
-
-    Storage.Objects.List listRequest =
-        
Transport.newStorageClient(options).build().objects().list("testProjectId");
-    assertNull(listRequest.get("$trace"));
-  }
-
-  @Test
-  public void testDeserializationAndSerializationOfGoogleApiTracer() throws 
Exception {
-    String serializedValue = "{\"Api\":\"Token\"}";
-    ObjectMapper objectMapper = new ObjectMapper();
-    assertEquals(serializedValue,
-        objectMapper.writeValueAsString(
-            objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 7d941bf..769a30a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -691,9 +691,9 @@ public class PipelineOptionsFactoryTest {
   @Test
   public void 
testPropertyIsSetOnRegisteredPipelineOptionNotPartOfOriginalInterface() {
     PipelineOptions options = PipelineOptionsFactory
-        .fromArgs("--project=testProject")
+        .fromArgs("--streaming")
         .create();
-    assertEquals("testProject", options.as(GcpOptions.class).getProject());
+    assertTrue(options.as(StreamingOptions.class).isStreaming());
   }
 
   /** A test interface containing all the primitives. */

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index 71ef311..76d8627 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -19,65 +19,23 @@ package org.apache.beam.sdk.runners;
 
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.CrashingRunner;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 /**
- * Tests for PipelineRunner.
+ * Tests for {@link PipelineRunner}.
  */
 @RunWith(JUnit4.class)
 public class PipelineRunnerTest {
-
-  @Mock private GcsUtil mockGcsUtil;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testLongName() {
-    // Check we can create a pipeline runner using the full class name.
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.as(ApplicationNameOptions.class).setAppName("test");
-    options.as(GcpOptions.class).setProject("test");
-    options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
-    options.setRunner(CrashingRunner.class);
-    options.as(GcpOptions.class).setGcpCredential(new TestCredential());
-    PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
-    assertTrue(runner instanceof CrashingRunner);
-  }
-
   @Test
-  public void testShortName() {
-    // Check we can create a pipeline runner using the short class name.
+  public void testInstantiation() {
     PipelineOptions options = PipelineOptionsFactory.create();
-    options.as(ApplicationNameOptions.class).setAppName("test");
-    options.as(GcpOptions.class).setProject("test");
-    options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
     options.setRunner(CrashingRunner.class);
-    options.as(GcpOptions.class).setGcpCredential(new TestCredential());
     PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
     assertTrue(runner instanceof CrashingRunner);
   }
-
-  @Test
-  public void testAppNameDefault() {
-    ApplicationNameOptions options = 
PipelineOptionsFactory.as(ApplicationNameOptions.class);
-    Assert.assertEquals(PipelineRunnerTest.class.getSimpleName(),
-        options.getAppName());
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
deleted file mode 100644
index 3b35856..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.QueryRequest;
-import com.google.api.services.bigquery.model.QueryResponse;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigInteger;
-import org.apache.beam.sdk.PipelineResult;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BigqueryMatcher}.
- */
-@RunWith(JUnit4.class)
-public class BigqueryMatcherTest {
-  private final String appName = "test-app";
-  private final String projectId = "test-project";
-  private final String query = "test-query";
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public FastNanoClockAndSleeper fastClock = new 
FastNanoClockAndSleeper();
-  @Mock private Bigquery mockBigqueryClient;
-  @Mock private Bigquery.Jobs mockJobs;
-  @Mock private Bigquery.Jobs.Query mockQuery;
-  @Mock private PipelineResult mockResult;
-
-  @Before
-  public void setUp() throws IOException {
-    MockitoAnnotations.initMocks(this);
-    when(mockBigqueryClient.jobs()).thenReturn(mockJobs);
-    when(mockJobs.query(anyString(), 
any(QueryRequest.class))).thenReturn(mockQuery);
-  }
-
-  @Test
-  public void testBigqueryMatcherThatSucceeds() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(
-            appName, projectId, query, 
"9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
-    assertThat(mockResult, matcher);
-    verify(matcher).newBigqueryClient(eq(appName));
-    verify(mockJobs).query(eq(projectId), eq(new 
QueryRequest().setQuery(query)));
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException 
{
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "incorrect-checksum"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
-
-    thrown.expect(AssertionError.class);
-    thrown.expectMessage("Total number of rows are: 1");
-    thrown.expectMessage("abc");
-    try {
-      assertThat(mockResult, matcher);
-    } finally {
-      verify(matcher).newBigqueryClient(eq(appName));
-      verify(mockJobs).query(eq(projectId), eq(new 
QueryRequest().setQuery(query)));
-    }
-  }
-
-  @Test
-  public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws 
Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
-    when(mockQuery.execute()).thenReturn(new 
QueryResponse().setJobComplete(false));
-
-    thrown.expect(AssertionError.class);
-    thrown.expectMessage("The query job hasn't completed.");
-    thrown.expectMessage("jobComplete=false");
-    try {
-      assertThat(mockResult, matcher);
-    } finally {
-      verify(matcher).newBigqueryClient(eq(appName));
-      verify(mockJobs).query(eq(projectId), eq(new 
QueryRequest().setQuery(query)));
-    }
-  }
-
-  @Test
-  public void testQueryWithRetriesWhenServiceFails() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    when(mockQuery.execute()).thenThrow(new IOException());
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to get BigQuery response after retrying");
-    try {
-      matcher.queryWithRetries(
-          mockBigqueryClient,
-          new QueryRequest(),
-          fastClock,
-          BigqueryMatcher.BACKOFF_FACTORY.backoff());
-    } finally {
-      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
-          .query(eq(projectId), eq(new QueryRequest()));
-    }
-  }
-
-  @Test
-  public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
-    BigqueryMatcher matcher = spy(
-        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
-    when(mockQuery.execute()).thenReturn(null);
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to get BigQuery response after retrying");
-    try {
-      matcher.queryWithRetries(
-          mockBigqueryClient,
-          new QueryRequest(),
-          fastClock,
-          BigqueryMatcher.BACKOFF_FACTORY.backoff());
-    } finally {
-      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
-          .query(eq(projectId), eq(new QueryRequest()));
-    }
-  }
-
-  private QueryResponse createResponseContainingTestData() {
-    TableCell field1 = new TableCell();
-    field1.setV("abc");
-    TableCell field2 = new TableCell();
-    field2.setV("2");
-    TableCell field3 = new TableCell();
-    field3.setV("testing BigQuery matcher.");
-    TableRow row = new TableRow();
-    row.setF(Lists.newArrayList(field1, field2, field3));
-
-    QueryResponse response = new QueryResponse();
-    response.setJobComplete(true);
-    response.setRows(Lists.newArrayList(row));
-    response.setTotalRows(BigInteger.ONE);
-    return response;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 2ddead7..04005c5 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -90,12 +89,11 @@ public class TestPipelineTest implements Serializable {
       String stringOptions =
           mapper.writeValueAsString(
               new String[] {
-                "--runner=org.apache.beam.sdk.testing.CrashingRunner", 
"--project=testProject"
+                "--runner=org.apache.beam.sdk.testing.CrashingRunner"
               });
       System.getProperties().put("beamTestPipelineOptions", stringOptions);
-      GcpOptions options = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+      PipelineOptions options = TestPipeline.testingPipelineOptions();
       assertEquals(CrashingRunner.class, options.getRunner());
-      assertEquals(options.getProject(), "testProject");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
deleted file mode 100644
index 395e1f3..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.storage.model.Bucket;
-import java.io.IOException;
-import org.apache.beam.sdk.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
-import org.mockito.MockitoAnnotations.Mock;
-
-/** Tests for DefaultBucket. */
-@RunWith(JUnit4.class)
-public class DefaultBucketTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private PipelineOptions options;
-  @Mock
-  private GcsUtil gcsUtil;
-  @Mock
-  private GcpProjectUtil gcpUtil;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    options = PipelineOptionsFactory.create();
-    options.as(GcsOptions.class).setGcsUtil(gcsUtil);
-    options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
-    options.as(GcpOptions.class).setProject("foo");
-    options.as(GcpOptions.class).setZone("us-north1-a");
-  }
-
-  @Test
-  public void testCreateBucket() {
-    String bucket = DefaultBucket.tryCreateDefaultBucket(options);
-    assertEquals("gs://dataflow-staging-us-north1-0", bucket);
-  }
-
-  @Test
-  public void testCreateBucketProjectLookupFails() throws IOException {
-    when(gcpUtil.getProjectNumber("foo")).thenThrow(new 
IOException("badness"));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to verify project");
-    DefaultBucket.tryCreateDefaultBucket(options);
-  }
-
-  @Test
-  public void testCreateBucketCreateBucketFails() throws IOException {
-    doThrow(new IOException("badness")).when(
-      gcsUtil).createBucket(any(String.class), any(Bucket.class));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable create default bucket");
-    DefaultBucket.tryCreateDefaultBucket(options);
-  }
-
-  @Test
-  public void testCannotGetBucketOwner() throws IOException {
-    when(gcsUtil.bucketOwner(any(GcsPath.class)))
-      .thenThrow(new IOException("badness"));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to determine the owner");
-    DefaultBucket.tryCreateDefaultBucket(options);
-  }
-
-  @Test
-  public void testProjectMismatch() throws IOException {
-    when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Bucket owner does not match the project");
-    DefaultBucket.tryCreateDefaultBucket(options);
-  }
-
-  @Test
-  public void regionFromZone() throws IOException {
-    assertEquals("us-central1", 
DefaultBucket.getRegionFromZone("us-central1-a"));
-    assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
-  }
-}

Reply via email to