http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java deleted file mode 100644 index 1c853bb..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ /dev/null @@ -1,798 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.batch.json.JsonBatchCallback; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; -import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; -import com.google.cloud.hadoop.util.ClientRequestHelper; -import com.google.cloud.hadoop.util.ResilientOperation; -import com.google.cloud.hadoop.util.RetryDeterminer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.AccessDeniedException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides operations on GCS. - */ -public class GcsUtil { - /** - * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using - * any transport flags specified on the {@link PipelineOptions}. - */ - public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> { - /** - * Returns an instance of {@link GcsUtil} based on the - * {@link PipelineOptions}. - * - * <p>If no instance has previously been created, one is created and the value - * stored in {@code options}. - */ - @Override - public GcsUtil create(PipelineOptions options) { - LOG.debug("Creating new GcsUtil"); - GcsOptions gcsOptions = options.as(GcsOptions.class); - Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); - return new GcsUtil( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - gcsOptions.getExecutorService(), - gcsOptions.getGcsUploadBufferSizeBytes()); - } - - /** - * Returns an instance of {@link GcsUtil} based on the given parameters. - */ - public static GcsUtil create( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - @Nullable Integer uploadBufferSizeBytes) { - return new GcsUtil( - storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); - - /** Maximum number of items to retrieve per Objects.List request. */ - private static final long MAX_LIST_ITEMS_PER_CALL = 1024; - - /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ - private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*"); - - private static final String RECURSIVE_WILDCARD = "[*]{2}"; - - /** - * A {@link Pattern} for globs with a recursive wildcard. - */ - private static final Pattern RECURSIVE_GCS_PATTERN = - Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); - - /** - * Maximum number of requests permitted in a GCS batch request. - */ - private static final int MAX_REQUESTS_PER_BATCH = 100; - /** - * Maximum number of concurrent batches of requests executing on GCS. - */ - private static final int MAX_CONCURRENT_BATCHES = 256; - - private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); - - ///////////////////////////////////////////////////////////////////////////// - - /** Client for the GCS API. */ - private Storage storageClient; - private final HttpRequestInitializer httpRequestInitializer; - /** Buffer size for GCS uploads (in bytes). */ - @Nullable private final Integer uploadBufferSizeBytes; - - // Helper delegate for turning IOExceptions from API calls into higher-level semantics. - private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - - // Exposed for testing. - final ExecutorService executorService; - - /** - * Returns true if the given GCS pattern is supported otherwise fails with an - * exception. - */ - public static boolean isGcsPatternSupported(String gcsPattern) { - if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { - throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " - + " recursive wildcards are not supported."); - } - return true; - } - - /** - * Returns the prefix portion of the glob that doesn't contain wildcards. - */ - public static String getGlobPrefix(String globExp) { - checkArgument(isGcsPatternSupported(globExp)); - Matcher m = GLOB_PREFIX.matcher(globExp); - checkArgument( - m.matches(), - String.format("Glob expression: [%s] is not expandable.", globExp)); - return m.group("PREFIX"); - } - - /** - * Expands glob expressions to regular expressions. - * - * @param globExp the glob expression to expand - * @return a string with the regular expression this glob expands to - */ - public static String globToRegexp(String globExp) { - StringBuilder dst = new StringBuilder(); - char[] src = globExp.toCharArray(); - int i = 0; - while (i < src.length) { - char c = src[i++]; - switch (c) { - case '*': - dst.append("[^/]*"); - break; - case '?': - dst.append("[^/]"); - break; - case '.': - case '+': - case '{': - case '}': - case '(': - case ')': - case '|': - case '^': - case '$': - // These need to be escaped in regular expressions - dst.append('\\').append(c); - break; - case '\\': - i = doubleSlashes(dst, src, i); - break; - default: - dst.append(c); - break; - } - } - return dst.toString(); - } - - /** - * Returns true if the given {@code spec} contains glob. - */ - public static boolean isGlob(GcsPath spec) { - return GLOB_PREFIX.matcher(spec.getObject()).matches(); - } - - private GcsUtil( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - @Nullable Integer uploadBufferSizeBytes) { - this.storageClient = storageClient; - this.httpRequestInitializer = httpRequestInitializer; - this.uploadBufferSizeBytes = uploadBufferSizeBytes; - this.executorService = executorService; - } - - // Use this only for testing purposes. - protected void setStorageClient(Storage storageClient) { - this.storageClient = storageClient; - } - - /** - * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded - * in the result. For patterns that only match a single object, we ensure that the object - * exists. - */ - public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { - checkArgument(isGcsPatternSupported(gcsPattern.getObject())); - Pattern p = null; - String prefix = null; - if (!isGlob(gcsPattern)) { - // Not a glob. - try { - // Use a get request to fetch the metadata of the object, and ignore the return value. - // The request has strong global consistency. - getObject(gcsPattern); - return ImmutableList.of(gcsPattern); - } catch (FileNotFoundException e) { - // If the path was not found, return an empty list. - return ImmutableList.of(); - } - } else { - // Part before the first wildcard character. - prefix = getGlobPrefix(gcsPattern.getObject()); - p = Pattern.compile(globToRegexp(gcsPattern.getObject())); - } - - LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), - prefix, p.toString()); - - String pageToken = null; - List<GcsPath> results = new LinkedList<>(); - do { - Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); - if (objects.getItems() == null) { - break; - } - - // Filter objects based on the regex. - for (StorageObject o : objects.getItems()) { - String name = o.getName(); - // Skip directories, which end with a slash. - if (p.matcher(name).matches() && !name.endsWith("/")) { - LOG.debug("Matched object: {}", name); - results.add(GcsPath.fromObject(o)); - } - } - pageToken = objects.getNextPageToken(); - } while (pageToken != null); - - return results; - } - - @VisibleForTesting - @Nullable - Integer getUploadBufferSizeBytes() { - return uploadBufferSizeBytes; - } - - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} - * if the resource does not exist. - */ - public long fileSize(GcsPath path) throws IOException { - return getObject(path).getSize().longValue(); - } - - /** - * Returns the {@link StorageObject} for the given {@link GcsPath}. - */ - public StorageObject getObject(GcsPath gcsPath) throws IOException { - return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); - } - - @VisibleForTesting - StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Objects.Get getObject = - storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); - try { - return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getObject), - backoff, - RetryDeterminer.SOCKET_ERRORS, - IOException.class, - sleeper); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { - throw new FileNotFoundException(gcsPath.toString()); - } - throw new IOException( - String.format("Unable to get the file object for path %s.", gcsPath), - e); - } - } - - /** - * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given - * {@link GcsPath GcsPaths}. - */ - public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) - throws IOException { - List<StorageObjectOrIOException[]> results = new ArrayList<>(); - executeBatches(makeGetBatches(gcsPaths, results)); - ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); - for (StorageObjectOrIOException[] result : results) { - ret.add(result[0]); - } - return ret.build(); - } - - /** - * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. - */ - public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) - throws IOException { - // List all objects that start with the prefix (including objects in sub-directories). - Storage.Objects.List listObject = storageClient.objects().list(bucket); - listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); - listObject.setPrefix(prefix); - - if (pageToken != null) { - listObject.setPageToken(pageToken); - } - - try { - return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(listObject), - BACKOFF_FACTORY.backoff(), - RetryDeterminer.SOCKET_ERRORS, - IOException.class); - } catch (Exception e) { - throw new IOException( - String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), - e); - } - } - - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} - * if the resource does not exist. - */ - @VisibleForTesting - List<Long> fileSizes(List<GcsPath> paths) throws IOException { - List<StorageObjectOrIOException> results = getObjects(paths); - - ImmutableList.Builder<Long> ret = ImmutableList.builder(); - for (StorageObjectOrIOException result : results) { - ret.add(toFileSize(result)); - } - return ret.build(); - } - - private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) - throws IOException { - if (storageObjectOrIOException.ioException() != null) { - throw storageObjectOrIOException.ioException(); - } else { - return storageObjectOrIOException.storageObject().getSize().longValue(); - } - } - - /** - * Opens an object in GCS. - * - * <p>Returns a SeekableByteChannel that provides access to data in the bucket. - * - * @param path the GCS filename to read from - * @return a SeekableByteChannel that can read the object data - */ - public SeekableByteChannel open(GcsPath path) - throws IOException { - return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(), - path.getObject(), errorExtractor, - new ClientRequestHelper<StorageObject>()); - } - - /** - * Creates an object in GCS. - * - * <p>Returns a WritableByteChannel that can be used to write data to the - * object. - * - * @param path the GCS file to write to - * @param type the type of object, eg "text/plain". - * @return a Callable object that encloses the operation. - */ - public WritableByteChannel create(GcsPath path, - String type) throws IOException { - GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel( - executorService, - storageClient, - new ClientRequestHelper<StorageObject>(), - path.getBucket(), - path.getObject(), - AsyncWriteChannelOptions.newBuilder().build(), - new ObjectWriteConditions(), - Collections.<String, String>emptyMap(), - type); - if (uploadBufferSizeBytes != null) { - channel.setUploadBufferSize(uploadBufferSizeBytes); - } - channel.initialize(); - return channel; - } - - /** - * Returns whether the GCS bucket exists and is accessible. - */ - public boolean bucketAccessible(GcsPath path) throws IOException { - return bucketAccessible( - path, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); - } - - /** - * Returns the project number of the project which owns this bucket. - * If the bucket exists, it must be accessible otherwise the permissions - * exception will be propagated. If the bucket does not exist, an exception - * will be thrown. - */ - public long bucketOwner(GcsPath path) throws IOException { - return getBucket( - path, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT).getProjectNumber().longValue(); - } - - /** - * Creates a {@link Bucket} under the specified project in Cloud Storage or - * propagates an exception. - */ - public void createBucket(String projectId, Bucket bucket) throws IOException { - createBucket( - projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); - } - - /** - * Returns whether the GCS bucket exists. This will return false if the bucket - * is inaccessible due to permissions. - */ - @VisibleForTesting - boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - try { - return getBucket(path, backoff, sleeper) != null; - } catch (AccessDeniedException | FileNotFoundException e) { - return false; - } - } - - @VisibleForTesting - @Nullable - Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Buckets.Get getBucket = - storageClient.buckets().get(path.getBucket()); - - try { - Bucket bucket = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getBucket), - backoff, - new RetryDeterminer<IOException>() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); - } - }, - IOException.class, - sleeper); - - return bucket; - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(path.toString(), null, e.getMessage()); - } - if (errorExtractor.itemNotFound(e)) { - throw new FileNotFoundException(e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format("Error while attempting to verify existence of bucket gs://%s", - path.getBucket()), e); - } - } - - @VisibleForTesting - void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) - throws IOException { - Storage.Buckets.Insert insertBucket = - storageClient.buckets().insert(projectId, bucket); - insertBucket.setPredefinedAcl("projectPrivate"); - insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); - - try { - ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(insertBucket), - backoff, - new RetryDeterminer<IOException>() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); - } - }, - IOException.class, - sleeper); - return; - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); - } - if (errorExtractor.itemAlreadyExists(e)) { - throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format("Error while attempting to create bucket gs://%s for rproject %s", - bucket.getName(), projectId), e); - } - } - - private static void executeBatches(List<BatchRequest> batches) throws IOException { - ListeningExecutorService executor = MoreExecutors.listeningDecorator( - MoreExecutors.getExitingExecutorService( - new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()))); - - List<ListenableFuture<Void>> futures = new LinkedList<>(); - for (final BatchRequest batch : batches) { - futures.add(executor.submit(new Callable<Void>() { - public Void call() throws IOException { - batch.execute(); - return null; - } - })); - } - - try { - Futures.allAsList(futures).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while executing batch GCS request", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof FileNotFoundException) { - throw (FileNotFoundException) e.getCause(); - } - throw new IOException("Error executing batch GCS request", e); - } finally { - executor.shutdown(); - } - } - - /** - * Makes get {@link BatchRequest BatchRequests}. - * - * @param paths {@link GcsPath GcsPaths}. - * @param results mutable {@link List} for return values. - * @return {@link BatchRequest BatchRequests} to execute. - * @throws IOException - */ - @VisibleForTesting - List<BatchRequest> makeGetBatches( - Collection<GcsPath> paths, - List<StorageObjectOrIOException[]> results) throws IOException { - List<BatchRequest> batches = new LinkedList<>(); - for (List<GcsPath> filesToGet : - Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); - for (GcsPath path : filesToGet) { - results.add(enqueueGetFileSize(path, batch)); - } - batches.add(batch); - } - return batches; - } - - public void copy(Iterable<String> srcFilenames, - Iterable<String> destFilenames) throws - IOException { - executeBatches(makeCopyBatches(srcFilenames, destFilenames)); - } - - List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames) - throws IOException { - List<String> srcList = Lists.newArrayList(srcFilenames); - List<String> destList = Lists.newArrayList(destFilenames); - checkArgument( - srcList.size() == destList.size(), - "Number of source files %s must equal number of destination files %s", - srcList.size(), - destList.size()); - - List<BatchRequest> batches = new LinkedList<>(); - BatchRequest batch = createBatchRequest(); - for (int i = 0; i < srcList.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); - final GcsPath destPath = GcsPath.fromUri(destList.get(i)); - enqueueCopy(sourcePath, destPath, batch); - if (batch.size() >= MAX_REQUESTS_PER_BATCH) { - batches.add(batch); - batch = createBatchRequest(); - } - } - if (batch.size() > 0) { - batches.add(batch); - } - return batches; - } - - List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException { - List<BatchRequest> batches = new LinkedList<>(); - for (List<String> filesToDelete : - Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); - for (String file : filesToDelete) { - enqueueDelete(GcsPath.fromUri(file), batch); - } - batches.add(batch); - } - return batches; - } - - public void remove(Collection<String> filenames) throws IOException { - executeBatches(makeRemoveBatches(filenames)); - } - - private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) - throws IOException { - final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; - - Storage.Objects.Get getRequest = storageClient.objects() - .get(path.getBucket(), path.getObject()); - getRequest.queue(batch, new JsonBatchCallback<StorageObject>() { - @Override - public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { - ret[0] = StorageObjectOrIOException.create(response); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { - IOException ioException; - if (errorExtractor.itemNotFound(e)) { - ioException = new FileNotFoundException(path.toString()); - } else { - ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); - } - ret[0] = StorageObjectOrIOException.create(ioException); - } - }); - return ret; - } - - /** - * A class that holds either a {@link StorageObject} or an {@link IOException}. - */ - @AutoValue - public abstract static class StorageObjectOrIOException { - - /** - * Returns the {@link StorageObject}. - */ - @Nullable - public abstract StorageObject storageObject(); - - /** - * Returns the {@link IOException}. - */ - @Nullable - public abstract IOException ioException(); - - @VisibleForTesting - public static StorageObjectOrIOException create(StorageObject storageObject) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - checkNotNull(storageObject, "storageObject"), - null /* ioException */); - } - - @VisibleForTesting - public static StorageObjectOrIOException create(IOException ioException) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - null /* storageObject */, - checkNotNull(ioException, "ioException")); - } - } - - private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) - throws IOException { - Storage.Objects.Copy copyRequest = storageClient.objects() - .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); - copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() { - @Override - public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully copied {} to {}", from, to); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - throw new IOException( - String.format("Error trying to copy %s to %s: %s", from, to, e)); - } - }); - } - - private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { - Storage.Objects.Delete deleteRequest = storageClient.objects() - .delete(file.getBucket(), file.getObject()); - deleteRequest.queue(batch, new JsonBatchCallback<Void>() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully deleted {}", file); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - throw new IOException(String.format("Error trying to delete %s: %s", file, e)); - } - }); - } - - private BatchRequest createBatchRequest() { - return storageClient.batch(httpRequestInitializer); - } - - private static int doubleSlashes(StringBuilder dst, char[] src, int i) { - // Emit the next character without special interpretation - dst.append('\\'); - if ((i - 1) != src.length) { - dst.append(src[i]); - i++; - } else { - // A backslash at the very end is treated like an escaped backslash - dst.append('\\'); - } - return i; - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java deleted file mode 100644 index 6fac6dc..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.util.BackOff; - - -/** - * Implementation of {@link BackOff} that increases the back off period for each retry attempt - * using a randomization function that grows exponentially. - * - * <p>Example: The initial interval is .5 seconds and the maximum interval is 60 secs. - * For 14 tries the sequence will be (values in seconds): - * - * <pre> - * retry# retry_interval randomized_interval - * 1 0.5 [0.25, 0.75] - * 2 0.75 [0.375, 1.125] - * 3 1.125 [0.562, 1.687] - * 4 1.687 [0.8435, 2.53] - * 5 2.53 [1.265, 3.795] - * 6 3.795 [1.897, 5.692] - * 7 5.692 [2.846, 8.538] - * 8 8.538 [4.269, 12.807] - * 9 12.807 [6.403, 19.210] - * 10 28.832 [14.416, 43.248] - * 11 43.248 [21.624, 64.873] - * 12 60.0 [30.0, 90.0] - * 13 60.0 [30.0, 90.0] - * 14 60.0 [30.0, 90.0] - * </pre> - * - * <p>Implementation is not thread-safe. - */ -@Deprecated -public class IntervalBoundedExponentialBackOff implements BackOff { - public static final double DEFAULT_MULTIPLIER = 1.5; - public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; - private final long maximumIntervalMillis; - private final long initialIntervalMillis; - private int currentAttempt; - - public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) { - checkArgument(maximumIntervalMillis > 0, "Maximum interval must be greater than zero."); - checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero."); - this.maximumIntervalMillis = maximumIntervalMillis; - this.initialIntervalMillis = initialIntervalMillis; - reset(); - } - - @Override - public void reset() { - currentAttempt = 1; - } - - @Override - public long nextBackOffMillis() { - double currentIntervalMillis = - Math.min( - initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1), - maximumIntervalMillis); - double randomOffset = - (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; - currentAttempt += 1; - return Math.round(currentIntervalMillis + randomOffset); - } - - public boolean atMaxInterval() { - return initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1) - >= maximumIntervalMillis; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java deleted file mode 100644 index f703e4c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.auth.Credentials; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Construct an oauth credential to be used by the SDK and the SDK workers. - * Always returns a null Credential object. - */ -public class NoopCredentialFactory implements CredentialFactory { - private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); - private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials(); - - public static NoopCredentialFactory fromOptions(PipelineOptions options) { - return INSTANCE; - } - - @Override - public Credentials getCredential() throws IOException { - return NOOP_CREDENTIALS; - } - - private static class NoopCredentials extends Credentials { - @Override - public String getAuthenticationType() { - return null; - } - - @Override - public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { - return null; - } - - @Override - public boolean hasRequestMetadata() { - return false; - } - - @Override - public boolean hasRequestMetadataOnly() { - return false; - } - - @Override - public void refresh() throws IOException {} - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java deleted file mode 100644 index 4ed35c6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.beam.sdk.util; - -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpUnsuccessfulResponseHandler; -import java.io.IOException; - -/** - * A {@link HttpRequestInitializer} for requests that don't have credentials. - * - * <p>When the access is denied, it throws {@link IOException} with a detailed error message. - */ -public class NullCredentialInitializer implements HttpRequestInitializer { - private static final int ACCESS_DENIED = 401; - private static final String NULL_CREDENTIAL_REASON = - "Unable to get application default credentials. Please see " - + "https://developers.google.com/accounts/docs/application-default-credentials " - + "for details on how to specify credentials. This version of the SDK is " - + "dependent on the gcloud core component version 2015.02.05 or newer to " - + "be able to get credentials from the currently authorized user via gcloud auth."; - - @Override - public void initialize(HttpRequest httpRequest) throws IOException { - httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler()); - } - - private static class NullCredentialHttpUnsuccessfulResponseHandler - implements HttpUnsuccessfulResponseHandler { - - @Override - public boolean handleResponse( - HttpRequest httpRequest, - HttpResponse httpResponse, boolean supportsRetry) throws IOException { - if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) { - throwNullCredentialException(); - } - return supportsRetry; - } - } - - public static void throwNullCredentialException() { - throw new RuntimeException(NULL_CREDENTIAL_REASON); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java deleted file mode 100644 index 80c093b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.storage.Storage; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.security.GeneralSecurityException; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * Helpers for cloud communication. - */ -public class Transport { - - private static class SingletonHelper { - /** Global instance of the JSON factory. */ - private static final JsonFactory JSON_FACTORY; - - /** Global instance of the HTTP transport. */ - private static final HttpTransport HTTP_TRANSPORT; - - static { - try { - JSON_FACTORY = JacksonFactory.getDefaultInstance(); - HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport(); - } catch (GeneralSecurityException | IOException e) { - throw new RuntimeException(e); - } - } - } - - public static HttpTransport getTransport() { - return SingletonHelper.HTTP_TRANSPORT; - } - - public static JsonFactory getJsonFactory() { - return SingletonHelper.JSON_FACTORY; - } - - private static class ApiComponents { - public String rootUrl; - public String servicePath; - - public ApiComponents(String root, String path) { - this.rootUrl = root; - this.servicePath = path; - } - } - - private static ApiComponents apiComponentsFromUrl(String urlString) { - try { - URL url = new URL(urlString); - String rootUrl = url.getProtocol() + "://" + url.getHost() - + (url.getPort() > 0 ? ":" + url.getPort() : ""); - return new ApiComponents(rootUrl, url.getPath()); - } catch (MalformedURLException e) { - throw new RuntimeException("Invalid URL: " + urlString); - } - } - - /** - * Returns a BigQuery client builder using the specified {@link BigQueryOptions}. - */ - public static Bigquery.Builder - newBigQueryClient(BigQueryOptions options) { - return new Bigquery.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Pubsub client builder using the specified {@link PubsubOptions}. - * - * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory - */ - @Deprecated - public static Pubsub.Builder - newPubsubClient(PubsubOptions options) { - return new Pubsub.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a CloudResourceManager client builder using the specified - * {@link CloudResourceManagerOptions}. - */ - public static CloudResourceManager.Builder - newCloudResourceManagerClient(CloudResourceManagerOptions options) { - Credentials credentials = options.getGcpCredential(); - if (credentials == null) { - NullCredentialInitializer.throwNullCredentialException(); - } - return new CloudResourceManager.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - credentials, - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. - */ - public static Storage.Builder - newStorageClient(GcsOptions options) { - String servicePath = options.getGcsEndpoint(); - Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log the code 404. Code up the stack will deal with 404's if needed, and - // logging it by default clutters the output during file staging. - new RetryHttpRequestInitializer( - ImmutableList.of(404), new UploadIdResponseInterceptor()))) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - if (servicePath != null) { - ApiComponents components = apiComponentsFromUrl(servicePath); - storageBuilder.setRootUrl(components.rootUrl); - storageBuilder.setServicePath(components.servicePath); - } - return storageBuilder; - } - - private static HttpRequestInitializer chainHttpRequestInitializer( - Credentials credential, HttpRequestInitializer httpRequestInitializer) { - if (credential == null) { - return new ChainingHttpRequestInitializer( - new NullCredentialInitializer(), httpRequestInitializer); - } else { - return new ChainingHttpRequestInitializer( - new HttpCredentialsAdapter(credential), - httpRequestInitializer); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index c4b3a9f..66c1393 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -40,8 +40,6 @@ public class SdkCoreApiSurfaceTest { "org.apache.beam", "com.google.api.client", "com.google.api.services.bigquery", - "com.google.api.services.cloudresourcemanager", - "com.google.api.services.pubsub", "com.google.api.services.storage", "com.google.auth", "com.google.protobuf", http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 3b6992a..a8655a6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -48,7 +48,6 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; - import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; @@ -56,14 +55,11 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; -import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; @@ -72,17 +68,13 @@ import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; - import javax.annotation.Nullable; - -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.TextIO.TextSource; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -96,9 +88,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; @@ -111,9 +101,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * Tests for TextIO Read and Write transforms. @@ -416,7 +403,7 @@ public class TextIOTest { } private static Predicate<List<String>> haveProperHeaderAndFooter(final String header, - final String footer) { + final String footer) { return new Predicate<List<String>>() { @Override public boolean apply(List<String> fileLines) { @@ -577,21 +564,6 @@ public class TextIOTest { input.apply(TextIO.Write.to(filename)); } - /** - * Recursive wildcards are not supported. - * This tests "**". - */ - @Test - public void testBadWildcardRecursive() throws Exception { - p.enableAbandonedNodeEnforcement(false); - - // Check that applying does fail. - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("wildcard"); - - p.apply(TextIO.Read.from("gs://bucket/foo**/baz")); - } - /** Options for testing. */ public interface RuntimeTestOptions extends PipelineOptions { ValueProvider<String> getInput(); @@ -1186,70 +1158,5 @@ public class TextIOTest { assertThat(splits, hasSize(equalTo(1))); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } - - /////////////////////////////////////////////////////////////////////////////////////////////// - // Test "gs://" paths - - private GcsUtil buildMockGcsUtil() throws IOException { - GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); - - // Any request to open gets a new bogus channel - Mockito - .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) - .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - // Any request for expansion returns a list containing the original GcsPath - // This is required to pass validation that occurs in TextIO during apply() - Mockito - .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) - .then(new Answer<List<GcsPath>>() { - @Override - public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - - return mockGcsUtil; - } - - /** - * This tests a few corner cases that should not crash. - */ - @Test - @Category(NeedsRunner.class) - public void testGoodWildcards() throws Exception { - GcsOptions options = TestPipeline.testingPipelineOptions().as(GcsOptions.class); - options.setGcsUtil(buildMockGcsUtil()); - - Pipeline pipeline = Pipeline.create(options); - - applyRead(pipeline, "gs://bucket/foo"); - applyRead(pipeline, "gs://bucket/foo/"); - applyRead(pipeline, "gs://bucket/foo/*"); - applyRead(pipeline, "gs://bucket/foo/?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]"); - applyRead(pipeline, "gs://bucket/foo/*baz*"); - applyRead(pipeline, "gs://bucket/foo/*baz?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); - applyRead(pipeline, "gs://bucket/foo/baz/*"); - applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); - applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); - applyRead(pipeline, "gs://bucket/foo*/baz"); - applyRead(pipeline, "gs://bucket/foo?/baz"); - applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); - - // Check that running doesn't fail. - pipeline.run(); - } - - private void applyRead(Pipeline pipeline, String path) { - pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); - } } + http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java deleted file mode 100644 index 288383e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory; -import org.apache.beam.sdk.testing.RestoreSystemProperties; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestRule; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link GcpOptions}. */ -@RunWith(JUnit4.class) -public class GcpOptionsTest { - @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGetProjectFromCloudSdkConfigEnv() throws Exception { - Map<String, String> environment = - ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest(tmpFolder.newFile("properties"), environment)); - } - - @Test - public void testGetProjectFromAppDataEnv() throws Exception { - Map<String, String> environment = - ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); - System.setProperty("os.name", "windows"); - assertEquals("test-project", - runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeEnvOld() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeEnv() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), - "old-project"); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } - - @Test - public void testUnableToGetDefaultProject() throws Exception { - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of()); - assertNull(projectFactory.create(PipelineOptionsFactory.create())); - } - - @Test - public void testEmptyGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setProject(""); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("--project is a required option"); - options.getGcpTempLocation(); - } - - @Test - public void testDefaultGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://bucket"; - options.setTempLocation(tempLocation); - options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); - assertEquals(tempLocation, options.getGcpTempLocation()); - } - - @Test - public void testDefaultGcpTempLocationInvalid() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setTempLocation("file://"); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - options.getGcpTempLocation(); - } - - @Test - public void testDefaultGcpTempLocationDoesNotExist() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://does/not/exist"; - options.setTempLocation(tempLocation); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - thrown.expectCause( - hasMessage(containsString("Output path does not exist or is not writeable"))); - - options.getGcpTempLocation(); - } - - private static void makePropertiesFileWithProject(File path, String projectId) - throws IOException { - String properties = String.format("[core]%n" - + "account = [email protected]%n" - + "project = %s%n" - + "%n" - + "[dataflow]%n" - + "magic = true%n", projectId); - Files.write(properties, path, StandardCharsets.UTF_8); - } - - private static String runGetProjectTest(File path, Map<String, String> environment) - throws Exception { - makePropertiesFileWithProject(path, "test-project"); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(environment); - return projectFactory.create(PipelineOptionsFactory.create()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java deleted file mode 100644 index dae7208..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.bigquery.Bigquery.Datasets.Delete; -import com.google.api.services.storage.Storage; -import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; -import org.apache.beam.sdk.util.TestCredential; -import org.apache.beam.sdk.util.Transport; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link GoogleApiDebugOptions}. */ -@RunWith(JUnit4.class) -public class GoogleApiDebugOptionsTest { - private static final String STORAGE_GET_TRACE = - "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; - private static final String STORAGE_GET_AND_LIST_TRACE = - "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," - + "\"Objects.List\":\"ListTraceDestination\"}"; - private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; - - @Test - public void testWhenTracingMatches() throws Exception { - String[] args = new String[] {STORAGE_GET_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get request = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("GetTraceDestination", request.get("$trace")); - } - - @Test - public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {STORAGE_GET_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.List request = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertNull(request.get("$trace")); - } - - @Test - public void testWithMultipleTraces() throws Exception { - String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("GetTraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertEquals("ListTraceDestination", listRequest.get("$trace")); - } - - @Test - public void testMatchingAllCalls() throws Exception { - String[] args = new String[] {STORAGE_TRACE}; - GcsOptions options = - PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertEquals("TraceDestination", listRequest.get("$trace")); - } - - @Test - public void testMatchingAgainstClient() throws Exception { - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newStorageClient(options).build(), "TraceDestination")); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) - .build().datasets().delete("testProjectId", "testDatasetId"); - assertNull(deleteRequest.get("$trace")); - } - - @Test - public void testMatchingAgainstRequestType() throws Exception { - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newStorageClient(options).build().objects() - .get("aProjectId", "aObjectId"), "TraceDestination")); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertNull(listRequest.get("$trace")); - } - - @Test - public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception { - String serializedValue = "{\"Api\":\"Token\"}"; - ObjectMapper objectMapper = new ObjectMapper(); - assertEquals(serializedValue, - objectMapper.writeValueAsString( - objectMapper.readValue(serializedValue, GoogleApiTracer.class))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 7d941bf..769a30a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -691,9 +691,9 @@ public class PipelineOptionsFactoryTest { @Test public void testPropertyIsSetOnRegisteredPipelineOptionNotPartOfOriginalInterface() { PipelineOptions options = PipelineOptionsFactory - .fromArgs("--project=testProject") + .fromArgs("--streaming") .create(); - assertEquals("testProject", options.as(GcpOptions.class).getProject()); + assertTrue(options.as(StreamingOptions.class).isStreaming()); } /** A test interface containing all the primitives. */ http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java index 71ef311..76d8627 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java @@ -19,65 +19,23 @@ package org.apache.beam.sdk.runners; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.CrashingRunner; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.TestCredential; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; /** - * Tests for PipelineRunner. + * Tests for {@link PipelineRunner}. */ @RunWith(JUnit4.class) public class PipelineRunnerTest { - - @Mock private GcsUtil mockGcsUtil; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testLongName() { - // Check we can create a pipeline runner using the full class name. - PipelineOptions options = PipelineOptionsFactory.create(); - options.as(ApplicationNameOptions.class).setAppName("test"); - options.as(GcpOptions.class).setProject("test"); - options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); - options.setRunner(CrashingRunner.class); - options.as(GcpOptions.class).setGcpCredential(new TestCredential()); - PipelineRunner<?> runner = PipelineRunner.fromOptions(options); - assertTrue(runner instanceof CrashingRunner); - } - @Test - public void testShortName() { - // Check we can create a pipeline runner using the short class name. + public void testInstantiation() { PipelineOptions options = PipelineOptionsFactory.create(); - options.as(ApplicationNameOptions.class).setAppName("test"); - options.as(GcpOptions.class).setProject("test"); - options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); options.setRunner(CrashingRunner.class); - options.as(GcpOptions.class).setGcpCredential(new TestCredential()); PipelineRunner<?> runner = PipelineRunner.fromOptions(options); assertTrue(runner instanceof CrashingRunner); } - - @Test - public void testAppNameDefault() { - ApplicationNameOptions options = PipelineOptionsFactory.as(ApplicationNameOptions.class); - Assert.assertEquals(PipelineRunnerTest.class.getSimpleName(), - options.getAppName()); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java deleted file mode 100644 index 3b35856..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.testing; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.QueryRequest; -import com.google.api.services.bigquery.model.QueryResponse; -import com.google.api.services.bigquery.model.TableCell; -import com.google.api.services.bigquery.model.TableRow; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.math.BigInteger; -import org.apache.beam.sdk.PipelineResult; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link BigqueryMatcher}. - */ -@RunWith(JUnit4.class) -public class BigqueryMatcherTest { - private final String appName = "test-app"; - private final String projectId = "test-project"; - private final String query = "test-query"; - - @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); - @Mock private Bigquery mockBigqueryClient; - @Mock private Bigquery.Jobs mockJobs; - @Mock private Bigquery.Jobs.Query mockQuery; - @Mock private PipelineResult mockResult; - - @Before - public void setUp() throws IOException { - MockitoAnnotations.initMocks(this); - when(mockBigqueryClient.jobs()).thenReturn(mockJobs); - when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery); - } - - @Test - public void testBigqueryMatcherThatSucceeds() throws Exception { - BigqueryMatcher matcher = spy( - new BigqueryMatcher( - appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc")); - doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); - when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); - - assertThat(mockResult, matcher); - verify(matcher).newBigqueryClient(eq(appName)); - verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); - } - - @Test - public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException { - BigqueryMatcher matcher = spy( - new BigqueryMatcher(appName, projectId, query, "incorrect-checksum")); - doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); - when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); - - thrown.expect(AssertionError.class); - thrown.expectMessage("Total number of rows are: 1"); - thrown.expectMessage("abc"); - try { - assertThat(mockResult, matcher); - } finally { - verify(matcher).newBigqueryClient(eq(appName)); - verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); - } - } - - @Test - public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception { - BigqueryMatcher matcher = spy( - new BigqueryMatcher(appName, projectId, query, "some-checksum")); - doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); - when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false)); - - thrown.expect(AssertionError.class); - thrown.expectMessage("The query job hasn't completed."); - thrown.expectMessage("jobComplete=false"); - try { - assertThat(mockResult, matcher); - } finally { - verify(matcher).newBigqueryClient(eq(appName)); - verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); - } - } - - @Test - public void testQueryWithRetriesWhenServiceFails() throws Exception { - BigqueryMatcher matcher = spy( - new BigqueryMatcher(appName, projectId, query, "some-checksum")); - when(mockQuery.execute()).thenThrow(new IOException()); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to get BigQuery response after retrying"); - try { - matcher.queryWithRetries( - mockBigqueryClient, - new QueryRequest(), - fastClock, - BigqueryMatcher.BACKOFF_FACTORY.backoff()); - } finally { - verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) - .query(eq(projectId), eq(new QueryRequest())); - } - } - - @Test - public void testQueryWithRetriesWhenQueryResponseNull() throws Exception { - BigqueryMatcher matcher = spy( - new BigqueryMatcher(appName, projectId, query, "some-checksum")); - when(mockQuery.execute()).thenReturn(null); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to get BigQuery response after retrying"); - try { - matcher.queryWithRetries( - mockBigqueryClient, - new QueryRequest(), - fastClock, - BigqueryMatcher.BACKOFF_FACTORY.backoff()); - } finally { - verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) - .query(eq(projectId), eq(new QueryRequest())); - } - } - - private QueryResponse createResponseContainingTestData() { - TableCell field1 = new TableCell(); - field1.setV("abc"); - TableCell field2 = new TableCell(); - field2.setV("2"); - TableCell field3 = new TableCell(); - field3.setV("testing BigQuery matcher."); - TableRow row = new TableRow(); - row.setF(Lists.newArrayList(field1, field2, field3)); - - QueryResponse response = new QueryResponse(); - response.setJobComplete(true); - response.setRows(Lists.newArrayList(row)); - response.setTotalRows(BigInteger.ONE); - return response; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 2ddead7..04005c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; @@ -90,12 +89,11 @@ public class TestPipelineTest implements Serializable { String stringOptions = mapper.writeValueAsString( new String[] { - "--runner=org.apache.beam.sdk.testing.CrashingRunner", "--project=testProject" + "--runner=org.apache.beam.sdk.testing.CrashingRunner" }); System.getProperties().put("beamTestPipelineOptions", stringOptions); - GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); + PipelineOptions options = TestPipeline.testingPipelineOptions(); assertEquals(CrashingRunner.class, options.getRunner()); - assertEquals(options.getProject(), "testProject"); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java deleted file mode 100644 index 395e1f3..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.when; - -import com.google.api.services.storage.model.Bucket; -import java.io.IOException; -import org.apache.beam.sdk.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.MockitoAnnotations; -import org.mockito.MockitoAnnotations.Mock; - -/** Tests for DefaultBucket. */ -@RunWith(JUnit4.class) -public class DefaultBucketTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private PipelineOptions options; - @Mock - private GcsUtil gcsUtil; - @Mock - private GcpProjectUtil gcpUtil; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - options = PipelineOptionsFactory.create(); - options.as(GcsOptions.class).setGcsUtil(gcsUtil); - options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil); - options.as(GcpOptions.class).setProject("foo"); - options.as(GcpOptions.class).setZone("us-north1-a"); - } - - @Test - public void testCreateBucket() { - String bucket = DefaultBucket.tryCreateDefaultBucket(options); - assertEquals("gs://dataflow-staging-us-north1-0", bucket); - } - - @Test - public void testCreateBucketProjectLookupFails() throws IOException { - when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to verify project"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testCreateBucketCreateBucketFails() throws IOException { - doThrow(new IOException("badness")).when( - gcsUtil).createBucket(any(String.class), any(Bucket.class)); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable create default bucket"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testCannotGetBucketOwner() throws IOException { - when(gcsUtil.bucketOwner(any(GcsPath.class))) - .thenThrow(new IOException("badness")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to determine the owner"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testProjectMismatch() throws IOException { - when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Bucket owner does not match the project"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void regionFromZone() throws IOException { - assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a")); - assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a")); - } -}
