http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java new file mode 100644 index 0000000..2b7135e --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.http.HttpBackOffIOExceptionHandler; +import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseInterceptor; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a request initializer that adds retry handlers to all + * HttpRequests. + * + * <p>Also can take a HttpResponseInterceptor to be applied to the responses. + */ +public class RetryHttpRequestInitializer implements HttpRequestInitializer { + + private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class); + + /** + * Http response codes that should be silently ignored. + */ + private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>( + Arrays.asList(307 /* Redirect, handled by the client library */, + 308 /* Resume Incomplete, handled by the client library */)); + + /** + * Http response timeout to use for hanging gets. + */ + private static final int HANGING_GET_TIMEOUT_SEC = 80; + + private static class LoggingHttpBackOffIOExceptionHandler + extends HttpBackOffIOExceptionHandler { + public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) { + super(backOff); + } + + @Override + public boolean handleIOException(HttpRequest request, boolean supportsRetry) + throws IOException { + boolean willRetry = super.handleIOException(request, supportsRetry); + if (willRetry) { + LOG.debug("Request failed with IOException, will retry: {}", request.getUrl()); + } else { + LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl()); + } + return willRetry; + } + } + + private static class LoggingHttpBackoffUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + private final HttpBackOffUnsuccessfulResponseHandler handler; + private final Set<Integer> ignoredResponseCodes; + + public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff, + Sleeper sleeper, Set<Integer> ignoredResponseCodes) { + this.ignoredResponseCodes = ignoredResponseCodes; + handler = new HttpBackOffUnsuccessfulResponseHandler(backoff); + handler.setSleeper(sleeper); + handler.setBackOffRequired( + new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() { + @Override + public boolean isRequired(HttpResponse response) { + int statusCode = response.getStatusCode(); + return (statusCode / 100 == 5) || // 5xx: server error + statusCode == 429; // 429: Too many requests + } + }); + } + + @Override + public boolean handleResponse(HttpRequest request, HttpResponse response, + boolean supportsRetry) throws IOException { + boolean retry = handler.handleResponse(request, response, supportsRetry); + if (retry) { + LOG.debug("Request failed with code {} will retry: {}", + response.getStatusCode(), request.getUrl()); + + } else if (!ignoredResponseCodes.contains(response.getStatusCode())) { + LOG.warn("Request failed with code {}, will NOT retry: {}", + response.getStatusCode(), request.getUrl()); + } + + return retry; + } + } + + private final HttpResponseInterceptor responseInterceptor; // response Interceptor to use + + private final NanoClock nanoClock; // used for testing + + private final Sleeper sleeper; // used for testing + + private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES); + + public RetryHttpRequestInitializer() { + this(Collections.<Integer>emptyList()); + } + + /** + * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. + */ + public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) { + this(additionalIgnoredResponseCodes, null); + } + + /** + * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. + * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null. + */ + public RetryHttpRequestInitializer( + Collection<Integer> additionalIgnoredResponseCodes, + @Nullable HttpResponseInterceptor responseInterceptor) { + this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes, + responseInterceptor); + } + + /** + * Visible for testing. + * + * @param nanoClock used as a timing source for knowing how much time has elapsed. + * @param sleeper used to sleep between retries. + * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. + */ + RetryHttpRequestInitializer( + NanoClock nanoClock, Sleeper sleeper, Collection<Integer> additionalIgnoredResponseCodes, + HttpResponseInterceptor responseInterceptor) { + this.nanoClock = nanoClock; + this.sleeper = sleeper; + this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes); + this.responseInterceptor = responseInterceptor; + } + + @Override + public void initialize(HttpRequest request) throws IOException { + // Set a timeout for hanging-gets. + // TODO: Do this exclusively for work requests. + request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000); + + // Back off on retryable http errors. + request.setUnsuccessfulResponseHandler( + // A back-off multiplier of 2 raises the maximum request retrying time + // to approximately 5 minutes (keeping other back-off parameters to + // their default values). + new LoggingHttpBackoffUnsuccessfulResponseHandler( + new ExponentialBackOff.Builder().setNanoClock(nanoClock) + .setMultiplier(2).build(), + sleeper, ignoredResponseCodes)); + + // Retry immediately on IOExceptions. + LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler = + new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF); + request.setIOExceptionHandler(loggingBackoffHandler); + + // Set response initializer + if (responseInterceptor != null) { + request.setResponseInterceptor(responseInterceptor); + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java new file mode 100644 index 0000000..b8474bb --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.storage.Storage; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.GeneralSecurityException; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; + +/** + * Helpers for cloud communication. + */ +public class Transport { + + private static class SingletonHelper { + /** Global instance of the JSON factory. */ + private static final JsonFactory JSON_FACTORY; + + /** Global instance of the HTTP transport. */ + private static final HttpTransport HTTP_TRANSPORT; + + static { + try { + JSON_FACTORY = JacksonFactory.getDefaultInstance(); + HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport(); + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException(e); + } + } + } + + public static HttpTransport getTransport() { + return SingletonHelper.HTTP_TRANSPORT; + } + + public static JsonFactory getJsonFactory() { + return SingletonHelper.JSON_FACTORY; + } + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. + */ + public static Storage.Builder + newStorageClient(GcsOptions options) { + String servicePath = options.getGcsEndpoint(); + Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log the code 404. Code up the stack will deal with 404's if needed, and + // logging it by default clutters the output during file staging. + new RetryHttpRequestInitializer( + ImmutableList.of(404), new UploadIdResponseInterceptor()))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + if (servicePath != null) { + ApiComponents components = apiComponentsFromUrl(servicePath); + storageBuilder.setRootUrl(components.rootUrl); + storageBuilder.setServicePath(components.servicePath); + } + return storageBuilder; + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credentials credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return new ChainingHttpRequestInitializer( + new NullCredentialInitializer(), httpRequestInitializer); + } else { + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java new file mode 100644 index 0000000..6a71bdc --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.gcsfs; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.google.api.services.storage.model.StorageObject; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.FileSystem; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Iterator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Implements the Java NIO {@link Path} API for Google Cloud Storage paths. + * + * <p>GcsPath uses a slash ('/') as a directory separator. Below is + * a summary of how slashes are treated: + * <ul> + * <li> A GCS bucket may not contain a slash. An object may contain zero or + * more slashes. + * <li> A trailing slash always indicates a directory, which is compliant + * with POSIX.1-2008. + * <li> Slashes separate components of a path. Empty components are allowed, + * these are represented as repeated slashes. An empty component always + * refers to a directory, and always ends in a slash. + * <li> {@link #getParent()}} always returns a path ending in a slash, as the + * parent of a GcsPath is always a directory. + * <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this + * applies the rules consistently and is highly recommended over any + * custom string concatenation. + * </ul> + * + * <p>GcsPath treats all GCS objects and buckets as belonging to the same + * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="". + * + * <p>Relative paths are not associated with any bucket. This matches common + * treatment of Path in which relative paths can be constructed from one + * filesystem and appended to another filesystem. + * + * @see <a href= + * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html" + * >Java Tutorials: Path Operations</a> + */ +public class GcsPath implements Path, Serializable { + + public static final String SCHEME = "gs"; + + /** + * Creates a GcsPath from a URI. + * + * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not + * contain a port, user info, a query, or a fragment. + */ + public static GcsPath fromUri(URI uri) { + checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri); + checkArgument(uri.getPort() == -1, + "GCS URI may not specify port: %s (%i)", uri, uri.getPort()); + checkArgument( + isNullOrEmpty(uri.getUserInfo()), + "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo()); + checkArgument( + isNullOrEmpty(uri.getQuery()), + "GCS URI may not specify query: %s (%s)", uri, uri.getQuery()); + checkArgument( + isNullOrEmpty(uri.getFragment()), + "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment()); + + return fromUri(uri.toString()); + } + + /** + * Pattern that is used to parse a GCS URL. + * + * <p>This is used to separate the components. Verification is handled + * separately. + */ + public static final Pattern GCS_URI = + Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?"); + + /** + * Creates a GcsPath from a URI in string form. + * + * <p>This does not use URI parsing, which means it may accept patterns that + * the URI parser would not accept. + */ + public static GcsPath fromUri(String uri) { + Matcher m = GCS_URI.matcher(uri); + checkArgument(m.matches(), "Invalid GCS URI: %s", uri); + + checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME), + "URI: %s is not a GCS URI", uri); + return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); + } + + /** + * Pattern that is used to parse a GCS resource name. + */ + private static final Pattern GCS_RESOURCE_NAME = + Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?"); + + /** + * Creates a GcsPath from a OnePlatform resource name in string form. + */ + public static GcsPath fromResourceName(String name) { + Matcher m = GCS_RESOURCE_NAME.matcher(name); + checkArgument(m.matches(), "Invalid GCS resource name: %s", name); + + return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); + } + + /** + * Creates a GcsPath from a {@linkplain StorageObject}. + */ + public static GcsPath fromObject(StorageObject object) { + return new GcsPath(null, object.getBucket(), object.getName()); + } + + /** + * Creates a GcsPath from bucket and object components. + * + * <p>A GcsPath without a bucket name is treated as a relative path, which + * is a path component with no linkage to the root element. This is similar + * to a Unix path that does not begin with the root marker (a slash). + * GCS has different naming constraints and APIs for working with buckets and + * objects, so these two concepts are kept separate to avoid accidental + * attempts to treat objects as buckets, or vice versa, as much as possible. + * + * <p>A GcsPath without an object name is a bucket reference. + * A bucket is always a directory, which could be used to lookup or add + * files to a bucket, but could not be opened as a file. + * + * <p>A GcsPath containing neither bucket or object names is treated as + * the root of the GCS filesystem. A listing on the root element would return + * the buckets available to the user. + * + * <p>If {@code null} is passed as either parameter, it is converted to an + * empty string internally for consistency. There is no distinction between + * an empty string and a {@code null}, as neither are allowed by GCS. + * + * @param bucket a GCS bucket name, or none ({@code null} or an empty string) + * if the object is not associated with a bucket + * (e.g. relative paths or the root node). + * @param object a GCS object path, or none ({@code null} or an empty string) + * for no object. + */ + public static GcsPath fromComponents(@Nullable String bucket, + @Nullable String object) { + return new GcsPath(null, bucket, object); + } + + @Nullable + private transient FileSystem fs; + @Nonnull + private final String bucket; + @Nonnull + private final String object; + + /** + * Constructs a GcsPath. + * + * @param fs the associated FileSystem, if any + * @param bucket the associated bucket, or none ({@code null} or an empty + * string) for a relative path component + * @param object the object, which is a fully-qualified object name if bucket + * was also provided, or none ({@code null} or an empty string) + * for no object + * @throws java.lang.IllegalArgumentException if the bucket of object names + * are invalid. + */ + public GcsPath(@Nullable FileSystem fs, + @Nullable String bucket, + @Nullable String object) { + if (bucket == null) { + bucket = ""; + } + checkArgument(!bucket.contains("/"), + "GCS bucket may not contain a slash"); + checkArgument(bucket.isEmpty() + || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"), + "GCS bucket names must contain only lowercase letters, numbers, " + + "dashes (-), underscores (_), and dots (.). Bucket names " + + "must start and end with a number or letter. " + + "See https://developers.google.com/storage/docs/bucketnaming " + + "for more details. Bucket name: " + bucket); + + if (object == null) { + object = ""; + } + checkArgument( + object.indexOf('\n') < 0 && object.indexOf('\r') < 0, + "GCS object names must not contain Carriage Return or " + + "Line Feed characters."); + + this.fs = fs; + this.bucket = bucket; + this.object = object; + } + + /** + * Returns the bucket name associated with this GCS path, or an empty string + * if this is a relative path component. + */ + public String getBucket() { + return bucket; + } + + /** + * Returns the object name associated with this GCS path, or an empty string + * if no object is specified. + */ + public String getObject() { + return object; + } + + public void setFileSystem(FileSystem fs) { + this.fs = fs; + } + + @Override + public FileSystem getFileSystem() { + return fs; + } + + // Absolute paths are those that have a bucket and the root path. + @Override + public boolean isAbsolute() { + return !bucket.isEmpty() || object.isEmpty(); + } + + @Override + public GcsPath getRoot() { + return new GcsPath(fs, "", ""); + } + + @Override + public GcsPath getFileName() { + int nameCount = getNameCount(); + if (nameCount < 2) { + throw new UnsupportedOperationException( + "Can't get filename from root path in the bucket: " + this); + } + return getName(nameCount - 1); + } + + /** + * Returns the <em>parent path</em>, or {@code null} if this path does not + * have a parent. + * + * <p>Returns a path that ends in '/', as the parent path always refers to + * a directory. + */ + @Override + public GcsPath getParent() { + if (bucket.isEmpty() && object.isEmpty()) { + // The root path has no parent, by definition. + return null; + } + + if (object.isEmpty()) { + // A GCS bucket. All buckets come from a common root. + return getRoot(); + } + + // Skip last character, in case it is a trailing slash. + int i = object.lastIndexOf('/', object.length() - 2); + if (i <= 0) { + if (bucket.isEmpty()) { + // Relative paths are not attached to the root node. + return null; + } + return new GcsPath(fs, bucket, ""); + } + + // Retain trailing slash. + return new GcsPath(fs, bucket, object.substring(0, i + 1)); + } + + @Override + public int getNameCount() { + int count = bucket.isEmpty() ? 0 : 1; + if (object.isEmpty()) { + return count; + } + + // Add another for each separator found. + int index = -1; + while ((index = object.indexOf('/', index + 1)) != -1) { + count++; + } + + return object.endsWith("/") ? count : count + 1; + } + + @Override + public GcsPath getName(int count) { + checkArgument(count >= 0); + + Iterator<Path> iterator = iterator(); + for (int i = 0; i < count; ++i) { + checkArgument(iterator.hasNext()); + iterator.next(); + } + + checkArgument(iterator.hasNext()); + return (GcsPath) iterator.next(); + } + + @Override + public GcsPath subpath(int beginIndex, int endIndex) { + checkArgument(beginIndex >= 0); + checkArgument(endIndex > beginIndex); + + Iterator<Path> iterator = iterator(); + for (int i = 0; i < beginIndex; ++i) { + checkArgument(iterator.hasNext()); + iterator.next(); + } + + GcsPath path = null; + while (beginIndex < endIndex) { + checkArgument(iterator.hasNext()); + if (path == null) { + path = (GcsPath) iterator.next(); + } else { + path = path.resolve(iterator.next()); + } + ++beginIndex; + } + + return path; + } + + @Override + public boolean startsWith(Path other) { + if (other instanceof GcsPath) { + GcsPath gcsPath = (GcsPath) other; + return startsWith(gcsPath.bucketAndObject()); + } else { + return startsWith(other.toString()); + } + } + + @Override + public boolean startsWith(String prefix) { + return bucketAndObject().startsWith(prefix); + } + + @Override + public boolean endsWith(Path other) { + if (other instanceof GcsPath) { + GcsPath gcsPath = (GcsPath) other; + return endsWith(gcsPath.bucketAndObject()); + } else { + return endsWith(other.toString()); + } + } + + @Override + public boolean endsWith(String suffix) { + return bucketAndObject().endsWith(suffix); + } + + // TODO: support "." and ".." path components? + @Override + public GcsPath normalize() { + return this; + } + + @Override + public GcsPath resolve(Path other) { + if (other instanceof GcsPath) { + GcsPath path = (GcsPath) other; + if (path.isAbsolute()) { + return path; + } else { + return resolve(path.getObject()); + } + } else { + return resolve(other.toString()); + } + } + + @Override + public GcsPath resolve(String other) { + if (bucket.isEmpty() && object.isEmpty()) { + // Resolve on a root path is equivalent to looking up a bucket and object. + other = SCHEME + "://" + other; + } + + if (other.startsWith(SCHEME + "://")) { + GcsPath path = GcsPath.fromUri(other); + path.setFileSystem(getFileSystem()); + return path; + } + + if (other.isEmpty()) { + // An empty component MUST refer to a directory. + other = "/"; + } + + if (object.isEmpty()) { + return new GcsPath(fs, bucket, other); + } else if (object.endsWith("/")) { + return new GcsPath(fs, bucket, object + other); + } else { + return new GcsPath(fs, bucket, object + "/" + other); + } + } + + @Override + public Path resolveSibling(Path other) { + throw new UnsupportedOperationException(); + } + + @Override + public Path resolveSibling(String other) { + if (getNameCount() < 2) { + throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); + } + GcsPath parent = getParent(); + return (parent == null) ? fromUri(other) : parent.resolve(other); + } + + @Override + public Path relativize(Path other) { + throw new UnsupportedOperationException(); + } + + @Override + public GcsPath toAbsolutePath() { + return this; + } + + @Override + public GcsPath toRealPath(LinkOption... options) throws IOException { + return this; + } + + @Override + public File toFile() { + throw new UnsupportedOperationException(); + } + + @Override + public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events, + WatchEvent.Modifier... modifiers) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<Path> iterator() { + return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject()); + } + + private static class NameIterator implements Iterator<Path> { + private final FileSystem fs; + private boolean fullPath; + private String name; + + NameIterator(FileSystem fs, boolean fullPath, String name) { + this.fs = fs; + this.fullPath = fullPath; + this.name = name; + } + + @Override + public boolean hasNext() { + return !isNullOrEmpty(name); + } + + @Override + public GcsPath next() { + int i = name.indexOf('/'); + String component; + if (i >= 0) { + component = name.substring(0, i); + name = name.substring(i + 1); + } else { + component = name; + name = null; + } + if (fullPath) { + fullPath = false; + return new GcsPath(fs, component, ""); + } else { + // Relative paths have no bucket. + return new GcsPath(fs, "", component); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public int compareTo(Path other) { + if (!(other instanceof GcsPath)) { + throw new ClassCastException(); + } + + GcsPath path = (GcsPath) other; + int b = bucket.compareTo(path.bucket); + if (b != 0) { + return b; + } + + // Compare a component at a time, so that the separator char doesn't + // get compared against component contents. Eg, "a/b" < "a-1/b". + Iterator<Path> left = iterator(); + Iterator<Path> right = path.iterator(); + + while (left.hasNext() && right.hasNext()) { + String leftStr = left.next().toString(); + String rightStr = right.next().toString(); + int c = leftStr.compareTo(rightStr); + if (c != 0) { + return c; + } + } + + if (!left.hasNext() && !right.hasNext()) { + return 0; + } else { + return left.hasNext() ? 1 : -1; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + GcsPath paths = (GcsPath) o; + return bucket.equals(paths.bucket) && object.equals(paths.object); + } + + @Override + public int hashCode() { + int result = bucket.hashCode(); + result = 31 * result + object.hashCode(); + return result; + } + + @Override + public String toString() { + if (!isAbsolute()) { + return object; + } + StringBuilder sb = new StringBuilder(); + sb.append(SCHEME) + .append("://"); + if (!bucket.isEmpty()) { + sb.append(bucket) + .append('/'); + } + sb.append(object); + return sb.toString(); + } + + // TODO: Consider using resource names for all GCS paths used by the SDK. + public String toResourceName() { + StringBuilder sb = new StringBuilder(); + sb.append("storage.googleapis.com/"); + if (!bucket.isEmpty()) { + sb.append(bucket).append('/'); + } + sb.append(object); + return sb.toString(); + } + + @Override + public URI toUri() { + try { + return new URI(SCHEME, "//" + bucketAndObject(), null); + } catch (URISyntaxException e) { + throw new RuntimeException("Unable to create URI for GCS path " + this); + } + } + + private String bucketAndObject() { + if (bucket.isEmpty()) { + return object; + } else { + return bucket + "/" + object; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java new file mode 100644 index 0000000..4d49f8c --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines utilities used to interact with Google Cloud Storage. */ +package org.apache.beam.sdk.util.gcsfs; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java new file mode 100644 index 0000000..f8135e7 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */ +package org.apache.beam.sdk.util; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java new file mode 100644 index 0000000..a8772c3 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam; + +import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.util.ApiSurface; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** API surface verification for Google Cloud Platform core components. */ +@RunWith(JUnit4.class) +public class GcpCoreApiSurfaceTest { + + @Test + public void testApiSurface() throws Exception { + + @SuppressWarnings("unchecked") + final Set<String> allowed = + ImmutableSet.of( + "org.apache.beam", + "com.google.api.client", + "com.google.api.services.storage", + "com.google.auth", + "com.fasterxml.jackson.annotation", + "com.fasterxml.jackson.core", + "com.fasterxml.jackson.databind", + "org.apache.avro", + "org.hamcrest", + // via DataflowMatchers + "org.codehaus.jackson", + // via Avro + "org.joda.time", + "org.junit"); + + assertThat( + ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java new file mode 100644 index 0000000..6f0846e --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Fake credential, for use in testing. + */ +public class TestCredential extends Credentials { + @Override + public String getAuthenticationType() { + return "Test"; + } + + @Override + public Map<String, List<String>> getRequestMetadata() throws IOException { + return Collections.emptyMap(); + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return Collections.emptyMap(); + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return true; + } + + @Override + public void refresh() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java new file mode 100644 index 0000000..68b3818 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects; +import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.api.services.storage.model.Bucket; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link GcpOptions}. */ +@RunWith(Enclosed.class) +public class GcpOptionsTest { + + /** Tests for the majority of methods. */ + @RunWith(JUnit4.class) + public static class Common { + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGetProjectFromCloudSdkConfigEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest(tmpFolder.newFile("properties"), environment)); + } + + @Test + public void testGetProjectFromAppDataEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); + System.setProperty("os.name", "windows"); + assertEquals("test-project", + runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeEnvOld() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeEnv() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + makePropertiesFileWithProject( + new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project"); + assertEquals("test-project", runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } + + @Test + public void testUnableToGetDefaultProject() throws Exception { + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of()); + assertNull(projectFactory.create(PipelineOptionsFactory.create())); + } + + @Test + public void testEmptyGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setGcpCredential(new TestCredential()); + options.setProject(""); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("--project is a required option"); + options.getGcpTempLocation(); + } + + @Test + public void testDefaultGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://bucket"; + options.setTempLocation(tempLocation); + options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); + assertEquals(tempLocation, options.getGcpTempLocation()); + } + + @Test + public void testDefaultGcpTempLocationInvalid() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setTempLocation("file://"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + options.getGcpTempLocation(); + } + + @Test + public void testDefaultGcpTempLocationDoesNotExist() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://does/not/exist"; + options.setTempLocation(tempLocation); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + thrown.expectCause( + hasMessage(containsString("Output path does not exist or is not writeable"))); + + options.getGcpTempLocation(); + } + + private static void makePropertiesFileWithProject(File path, String projectId) + throws IOException { + String properties = String.format("[core]%n" + + "account = [email protected]%n" + + "project = %s%n" + + "%n" + + "[dataflow]%n" + + "magic = true%n", projectId); + Files.write(properties, path, StandardCharsets.UTF_8); + } + + private static String runGetProjectTest(File path, Map<String, String> environment) + throws Exception { + makePropertiesFileWithProject(path, "test-project"); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(environment); + return projectFactory.create(PipelineOptionsFactory.create()); + } + } + + /** Tests related to determining the GCP temp location. */ + @RunWith(JUnit4.class) + public static class GcpTempLocation { + @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock private GcsUtil mockGcsUtil; + @Mock private CloudResourceManager mockCrmClient; + @Mock private Projects mockProjects; + @Mock private Get mockGet; + private Project fakeProject; + private PipelineOptions options; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); + options.as(GcpOptions.class).setProject("foo"); + options.as(GcpOptions.class).setZone("us-north1-a"); + when(mockCrmClient.projects()).thenReturn(mockProjects); + when(mockProjects.get(any(String.class))).thenReturn(mockGet); + fakeProject = new Project().setProjectNumber(1L); + } + + @Test + public void testCreateBucket() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L); + + String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + assertEquals("gs://dataflow-staging-us-north1-1", bucket); + } + + @Test + public void testCreateBucketProjectLookupFails() throws Exception { + doThrow(new IOException("badness")).when(mockGet).execute(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to verify project"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testCreateBucketCreateBucketFails() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + doThrow(new IOException("badness")).when( + mockGcsUtil).createBucket(any(String.class), any(Bucket.class)); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable create default bucket"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testCannotGetBucketOwner() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))) + .thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to determine the owner"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testProjectMismatch() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Bucket owner does not match the project"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void regionFromZone() throws Exception { + assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a")); + assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a")); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java new file mode 100644 index 0000000..67d5880 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete; +import com.google.api.services.storage.Storage; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.Transport; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GoogleApiDebugOptions}. */ +@RunWith(JUnit4.class) +public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + + @Test + public void testWhenTracingMatches() throws Exception { + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("GetTraceDestination", request.get("$trace")); + } + + @Test + public void testWhenTracingDoesNotMatch() throws Exception { + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(request.get("$trace")); + } + + @Test + public void testWithMultipleTraces() throws Exception { + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("GetTraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); + } + + @Test + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); + } + + @Test + public void testMatchingAgainstClient() throws Exception { + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( + Transport.newStorageClient(options).build(), "TraceDestination")); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Delete deleteRequest = GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient( + options.as(CloudResourceManagerOptions.class)) + .build().projects().delete("testProjectId"); + assertNull(deleteRequest.get("$trace")); + } + + @Test + public void testMatchingAgainstRequestType() throws Exception { + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); + } + + @Test + public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception { + String serializedValue = "{\"Api\":\"Token\"}"; + ObjectMapper objectMapper = new ObjectMapper(); + assertEquals(serializedValue, + objectMapper.writeValueAsString( + objectMapper.readValue(serializedValue, GoogleApiTracer.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java new file mode 100644 index 0000000..a29dd45 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsIOChannelFactoryRegistrar}. + */ +@RunWith(JUnit4.class) +public class GcsIOChannelFactoryRegistrarTest { + + @Test + public void testServiceLoader() { + for (IOChannelFactoryRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { + if (registrar instanceof GcsIOChannelFactoryRegistrar) { + return; + } + } + fail("Expected to find " + GcsIOChannelFactoryRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java new file mode 100644 index 0000000..f53490a --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GcsIOChannelFactoryTest}. */ +@RunWith(JUnit4.class) +public class GcsIOChannelFactoryTest { + private GcsIOChannelFactory factory; + + @Before + public void setUp() { + factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class)); + } + + @Test + public void testResolve() throws Exception { + assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java new file mode 100644 index 0000000..65fb228 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link GcsPathValidator}. */ +@RunWith(JUnit4.class) +public class GcsPathValidatorTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Mock private GcsUtil mockGcsUtil; + private GcsPathValidator validator; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGcsUtil(mockGcsUtil); + validator = GcsPathValidator.fromOptions(options); + } + + @Test + public void testValidFilePattern() { + validator.validateInputFilePatternSupported("gs://bucket/path"); + } + + @Test + public void testInvalidFilePattern() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateInputFilePatternSupported("/local/path"); + } + + @Test + public void testFilePatternMissingBucket() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Missing object or bucket in path: 'gs://input/', " + + "did you mean: 'gs://some-bucket/input'?"); + validator.validateInputFilePatternSupported("gs://input"); + } + + @Test + public void testWhenBucketDoesNotExist() throws Exception { + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Could not find file gs://non-existent-bucket/location"); + validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); + } + + @Test + public void testValidOutputPrefix() { + validator.validateOutputFilePrefixSupported("gs://bucket/path"); + } + + @Test + public void testInvalidOutputPrefix() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateOutputFilePrefixSupported("/local/path"); + } + + @Test + public void testOutputPrefixMissingBucket() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Missing object or bucket in path: 'gs://output/', " + + "did you mean: 'gs://some-bucket/output'?"); + validator.validateOutputFilePrefixSupported("gs://output"); + } +}
