http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java new file mode 100644 index 0000000..4355a10 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Always returns a null Credential object. + */ +public class NoopCredentialFactory implements CredentialFactory { + private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); + private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials(); + + public static NoopCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + @Override + public Credentials getCredential() throws IOException { + return NOOP_CREDENTIALS; + } + + private static class NoopCredentials extends Credentials { + @Override + public String getAuthenticationType() { + return null; + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return null; + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return false; + } + + @Override + public void refresh() throws IOException {} + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java new file mode 100644 index 0000000..00306f2 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import java.io.IOException; + +/** + * A {@link HttpRequestInitializer} for requests that don't have credentials. + * + * <p>When the access is denied, it throws {@link IOException} with a detailed error message. + */ +public class NullCredentialInitializer implements HttpRequestInitializer { + private static final int ACCESS_DENIED = 401; + private static final String NULL_CREDENTIAL_REASON = + "Unable to get application default credentials. Please see " + + "https://developers.google.com/accounts/docs/application-default-credentials " + + "for details on how to specify credentials. This version of the SDK is " + + "dependent on the gcloud core component version 2015.02.05 or newer to " + + "be able to get credentials from the currently authorized user via gcloud auth."; + + @Override + public void initialize(HttpRequest httpRequest) throws IOException { + httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler()); + } + + private static class NullCredentialHttpUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + + @Override + public boolean handleResponse( + HttpRequest httpRequest, + HttpResponse httpResponse, boolean supportsRetry) throws IOException { + if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) { + throwNullCredentialException(); + } + return supportsRetry; + } + } + + public static void throwNullCredentialException() { + throw new RuntimeException(NULL_CREDENTIAL_REASON); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java new file mode 100644 index 0000000..3d77bf2 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines classes related to interacting with {@link com.google.auth.Credentials} for + * pipeline creation and execution containing Google Cloud Platform components. + */ +package org.apache.beam.sdk.extensions.gcp.auth; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java new file mode 100644 index 0000000..87557e5 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; + +/** + * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. + */ +@Description("Options that are used to configure Google CloudResourceManager. See " + + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") +public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java new file mode 100644 index 0000000..445f00f --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.api.services.storage.model.Bucket; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; +import java.security.GeneralSecurityException; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Options used to configure Google Cloud Platform specific options such as the project + * and credentials. + * + * <p>These options defer to the + * <a href="https://developers.google.com/accounts/docs/application-default-credentials"> + * application default credentials</a> for authentication. See the + * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for + * alternative mechanisms for creating credentials. + */ +@Description("Options used to configure Google Cloud Platform project and credentials.") +public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { + /** + * Project id to use when launching jobs. + */ + @Description("Project id. Required when using Google Cloud Platform services. " + + "See https://cloud.google.com/storage/docs/projects for further details.") + @Default.InstanceFactory(DefaultProjectFactory.class) + String getProject(); + void setProject(String value); + + /** + * GCP <a href="https://developers.google.com/compute/docs/zones" + * >availability zone</a> for operations. + * + * <p>Default is set on a per-service basis. + */ + @Description("GCP availability zone for running GCP operations. " + + "Default is up to the individual service.") + String getZone(); + void setZone(String value); + + /** + * The class of the credential factory that should be created and used to create + * credentials. If gcpCredential has not been set explicitly, an instance of this class will + * be constructed and used as a credential factory. + */ + @Description("The class of the credential factory that should be created and used to create " + + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " + + "be constructed and used as a credential factory.") + @Default.Class(GcpCredentialFactory.class) + Class<? extends CredentialFactory> getCredentialFactoryClass(); + void setCredentialFactoryClass( + Class<? extends CredentialFactory> credentialFactoryClass); + + /** + * The credential instance that should be used to authenticate against GCP services. + * If no credential has been set explicitly, the default is to use the instance factory + * that constructs a credential based upon the currently set credentialFactoryClass. + */ + @JsonIgnore + @Description("The credential instance that should be used to authenticate against GCP services. " + + "If no credential has been set explicitly, the default is to use the instance factory " + + "that constructs a credential based upon the currently set credentialFactoryClass.") + @Default.InstanceFactory(GcpUserCredentialsFactory.class) + Credentials getGcpCredential(); + void setGcpCredential(Credentials value); + + /** + * Attempts to infer the default project based upon the environment this application + * is executing within. Currently this only supports getting the default project from gcloud. + */ + class DefaultProjectFactory implements DefaultValueFactory<String> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); + + @Override + public String create(PipelineOptions options) { + try { + File configFile; + if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { + configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); + } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { + configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); + } else { + // New versions of gcloud use this file + configFile = new File( + System.getProperty("user.home"), + ".config/gcloud/configurations/config_default"); + if (!configFile.exists()) { + // Old versions of gcloud use this file + configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); + } + } + String section = null; + Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); + Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); + for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { + line = line.trim(); + if (line.isEmpty() || line.startsWith(";")) { + continue; + } + Matcher matcher = sectionPattern.matcher(line); + if (matcher.matches()) { + section = matcher.group(1); + } else if (section == null || section.equals("core")) { + matcher = projectPattern.matcher(line); + if (matcher.matches()) { + String project = matcher.group(1).trim(); + LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " + + "project, please cancel this Pipeline and specify the command-line " + + "argument --project.", project); + return project; + } + } + } + } catch (IOException expected) { + LOG.debug("Failed to find default project.", expected); + } + // return null if can't determine + return null; + } + + /** + * Returns true if running on the Windows OS. + */ + private static boolean isWindows() { + return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); + } + + /** + * Used to mock out getting environment variables. + */ + @VisibleForTesting + Map<String, String> getEnvironment() { + return System.getenv(); + } + } + + /** + * Attempts to load the GCP credentials. See + * {@link CredentialFactory#getCredential()} for more details. + */ + class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> { + @Override + public Credentials create(PipelineOptions options) { + GcpOptions gcpOptions = options.as(GcpOptions.class); + try { + CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) + .fromClass(gcpOptions.getCredentialFactoryClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + return factory.getCredential(); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Unable to obtain credential", e); + } + } + } + + /** + * A GCS path for storing temporary files in GCP. + * + * <p>Its default to {@link PipelineOptions#getTempLocation}. + */ + @Description("A GCS path for storing temporary files in GCP.") + @Default.InstanceFactory(GcpTempLocationFactory.class) + @Nullable String getGcpTempLocation(); + void setGcpTempLocation(String value); + + /** + * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. + */ + class GcpTempLocationFactory implements DefaultValueFactory<String> { + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + static final String DEFAULT_REGION = "us-central1"; + static final Logger LOG = LoggerFactory.getLogger(GcpTempLocationFactory.class); + + @Override + @Nullable + public String create(PipelineOptions options) { + String tempLocation = options.getTempLocation(); + if (isNullOrEmpty(tempLocation)) { + tempLocation = tryCreateDefaultBucket(options, + newCloudResourceManagerClient(options.as(CloudResourceManagerOptions.class)).build()); + options.setTempLocation(tempLocation); + } else { + try { + PathValidator validator = options.as(GcsOptions.class).getPathValidator(); + validator.validateOutputFilePrefixSupported(tempLocation); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path, %s. ", tempLocation), e); + } + } + return tempLocation; + } + + /** + * Creates a default bucket or verifies the existence and proper access control + * of an existing default bucket. Returns the location if successful. + */ + @VisibleForTesting + static String tryCreateDefaultBucket( + PipelineOptions options, CloudResourceManager crmClient) { + GcsOptions gcpOptions = options.as(GcsOptions.class); + + final String projectId = gcpOptions.getProject(); + checkArgument(!isNullOrEmpty(projectId), + "--project is a required option."); + + // Look up the project number, to create a default bucket with a stable + // name with no special characters. + long projectNumber = 0L; + try { + projectNumber = getProjectNumber(projectId, crmClient); + } catch (IOException e) { + throw new RuntimeException("Unable to verify project with ID " + projectId, e); + } + String region = DEFAULT_REGION; + if (!isNullOrEmpty(gcpOptions.getZone())) { + region = getRegionFromZone(gcpOptions.getZone()); + } + final String bucketName = + "dataflow-staging-" + region + "-" + projectNumber; + LOG.info("No staging location provided, attempting to use default bucket: {}", + bucketName); + Bucket bucket = new Bucket() + .setName(bucketName) + .setLocation(region); + // Always try to create the bucket before checking access, so that we do not + // race with other pipelines that may be attempting to do the same thing. + try { + gcpOptions.getGcsUtil().createBucket(projectId, bucket); + } catch (FileAlreadyExistsException e) { + LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); + } catch (IOException e) { + throw new RuntimeException("Unable create default bucket.", e); + } + + // Once the bucket is expected to exist, verify that it is correctly owned + // by the project executing the job. + try { + long owner = gcpOptions.getGcsUtil().bucketOwner( + GcsPath.fromComponents(bucketName, "")); + checkArgument( + owner == projectNumber, + "Bucket owner does not match the project from --project:" + + " %s vs. %s", owner, projectNumber); + } catch (IOException e) { + throw new RuntimeException( + "Unable to determine the owner of the default bucket at gs://" + bucketName, e); + } + return "gs://" + bucketName; + } + + /** + * Returns the project number or throws an exception if the project does not + * exist or has other access exceptions. + */ + private static long getProjectNumber( + String projectId, + CloudResourceManager crmClient) throws IOException { + return getProjectNumber( + projectId, + crmClient, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number or throws an error if the project does not + * exist or has other access errors. + */ + private static long getProjectNumber( + String projectId, + CloudResourceManager crmClient, + BackOff backoff, + Sleeper sleeper) throws IOException { + CloudResourceManager.Projects.Get getProject = + crmClient.projects().get(projectId); + try { + Project project = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getProject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + return project.getProjectNumber(); + } catch (Exception e) { + throw new IOException("Unable to get project number", e); + } + } + + @VisibleForTesting + static String getRegionFromZone(String zone) { + String[] zoneParts = zone.split("-"); + checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); + return zoneParts[0] + "-" + zoneParts[1]; + } + + /** + * Returns a CloudResourceManager client builder using the specified + * {@link CloudResourceManagerOptions}. + */ + @VisibleForTesting + static CloudResourceManager.Builder newCloudResourceManagerClient( + CloudResourceManagerOptions options) { + Credentials credentials = options.getGcpCredential(); + if (credentials == null) { + NullCredentialInitializer.throwNullCredentialException(); + } + return new CloudResourceManager.Builder(Transport.getTransport(), Transport.getJsonFactory(), + chainHttpRequestInitializer( + credentials, + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credentials credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return new ChainingHttpRequestInitializer( + new NullCredentialInitializer(), httpRequestInitializer); + } else { + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java new file mode 100644 index 0000000..afc3416 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.gcp.options; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; + +/** + * A registrar containing the default GCP options. + */ +@AutoService(PipelineOptionsRegistrar.class) +public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>builder() + .add(GcpOptions.class) + .add(GcsOptions.class) + .add(GoogleApiDebugOptions.class) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java new file mode 100644 index 0000000..954092c --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.GcsPathValidator; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; + +/** + * Options used to configure Google Cloud Storage. + */ +public interface GcsOptions extends + ApplicationNameOptions, GcpOptions, PipelineOptions { + /** + * The GcsUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") + @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class) + @Hidden + GcsUtil getGcsUtil(); + void setGcsUtil(GcsUtil value); + + /** + * The ExecutorService instance to use to create threads, can be overridden to specify an + * ExecutorService that is compatible with the users environment. If unset, the + * default is to create an ExecutorService with an unbounded number of threads; this + * is compatible with Google AppEngine. + */ + @JsonIgnore + @Description("The ExecutorService instance to use to create multiple threads. Can be overridden " + + "to specify an ExecutorService that is compatible with the users environment. If unset, " + + "the default is to create an ExecutorService with an unbounded number of threads; this " + + "is compatible with Google AppEngine.") + @Default.InstanceFactory(ExecutorServiceFactory.class) + @Hidden + ExecutorService getExecutorService(); + void setExecutorService(ExecutorService value); + + /** + * GCS endpoint to use. If unspecified, uses the default endpoint. + */ + @JsonIgnore + @Hidden + @Description("The URL for the GCS API.") + String getGcsEndpoint(); + void setGcsEndpoint(String value); + + /** + * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for + * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the + * restrictions and performance implications of this value. + */ + @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the " + + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " + + "information on the restrictions and performance implications of this value.\n\n" + + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + @Nullable + Integer getGcsUploadBufferSizeBytes(); + void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); + + /** + * The class of the validator that should be created and used to validate paths. + * If pathValidator has not been set explicitly, an instance of this class will be + * constructed and used as the path validator. + */ + @Description("The class of the validator that should be created and used to validate paths. " + + "If pathValidator has not been set explicitly, an instance of this class will be " + + "constructed and used as the path validator.") + @Default.Class(GcsPathValidator.class) + Class<? extends PathValidator> getPathValidatorClass(); + void setPathValidatorClass(Class<? extends PathValidator> validatorClass); + + /** + * The path validator instance that should be used to validate paths. + * If no path validator has been set explicitly, the default is to use the instance factory that + * constructs a path validator based upon the currently set pathValidatorClass. + */ + @JsonIgnore + @Description("The path validator instance that should be used to validate paths. " + + "If no path validator has been set explicitly, the default is to use the instance factory " + + "that constructs a path validator based upon the currently set pathValidatorClass.") + @Default.InstanceFactory(PathValidatorFactory.class) + PathValidator getPathValidator(); + void setPathValidator(PathValidator validator); + + /** + * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The + * {@link ExecutorService} is compatible with AppEngine. + */ + class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { + @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. + @Override + public ExecutorService create(PipelineOptions options) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + threadFactoryBuilder.setDaemon(true); + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<Runnable>(), + threadFactoryBuilder.build()); + } + } + + /** + * Creates a {@link PathValidator} object using the class specified in + * {@link #getPathValidatorClass()}. + */ + class PathValidatorFactory implements DefaultValueFactory<PathValidator> { + @Override + public PathValidator create(PipelineOptions options) { + GcsOptions gcsOptions = options.as(GcsOptions.class); + return InstanceBuilder.ofType(PathValidator.class) + .fromClass(gcsOptions.getPathValidatorClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java new file mode 100644 index 0000000..01144c4 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import com.google.api.client.googleapis.services.AbstractGoogleClient; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.googleapis.services.GoogleClientRequestInitializer; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * These options configure debug settings for Google API clients created within the Apache Beam SDK. + */ +public interface GoogleApiDebugOptions extends PipelineOptions { + /** + * This option enables tracing of API calls to Google services used within the Apache + * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...} + * </code> where the {@code ApiName} represents the request classes canonical name. The + * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported. + * Typically, "producer" is the right destination to use: this makes API traces available to the + * team offering the API. Note that by enabling this option, the contents of the requests to and + * from Google Cloud services will be made available to Google. For example, by specifying + * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available + * to Google, specifically to the Google Cloud Dataflow team. + */ + @Description("This option enables tracing of API calls to Google services used within the Apache " + + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} " + + "where the ApiName represents the request classes canonical name. The TraceDestination is " + + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is " + + "the right destination to use: this makes API traces available to the team offering the " + + "API. Note that by enabling this option, the contents of the requests to and from " + + "Google Cloud services will be made available to Google. For example, by specifying " + + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to " + + "Google, specifically to the Google Cloud Dataflow team.") + GoogleApiTracer getGoogleApiTrace(); + void setGoogleApiTrace(GoogleApiTracer commands); + + /** + * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls. + */ + class GoogleApiTracer extends HashMap<String, String> + implements GoogleClientRequestInitializer { + /** + * Creates a {@link GoogleApiTracer} that sets the trace destination on all + * calls that match the given client type. + */ + public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) { + put(client.getClass().getCanonicalName(), traceDestination); + return this; + } + + /** + * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all + * calls that match for the given request type. + */ + public GoogleApiTracer addTraceFor( + AbstractGoogleClientRequest<?> request, String traceDestination) { + put(request.getClass().getCanonicalName(), traceDestination); + return this; + } + + @Override + public void initialize(AbstractGoogleClientRequest<?> request) throws IOException { + for (Map.Entry<String, String> entry : this.entrySet()) { + if (request.getClass().getCanonicalName().contains(entry.getKey())) { + request.set("$trace", entry.getValue()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java new file mode 100644 index 0000000..bc9646c --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for + * configuring pipeline execution for Google Cloud Platform components. + */ +package org.apache.beam.sdk.extensions.gcp.options; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java new file mode 100644 index 0000000..3a12620 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * Implements IOChannelFactory for GCS. + */ +public class GcsIOChannelFactory implements IOChannelFactory { + + /** + * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}. + */ + public static GcsIOChannelFactory fromOptions(PipelineOptions options) { + return new GcsIOChannelFactory(options.as(GcsOptions.class)); + } + + private final GcsOptions options; + + private GcsIOChannelFactory(GcsOptions options) { + this.options = options; + } + + @Override + public Collection<String> match(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + List<GcsPath> matched = util.expand(path); + + List<String> specs = new LinkedList<>(); + for (GcsPath match : matched) { + specs.add(match.toString()); + } + + return specs; + } + + @Override + public ReadableByteChannel open(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.open(path); + } + + @Override + public WritableByteChannel create(String spec, String mimeType) + throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.create(path, mimeType); + } + + @Override + public long getSizeBytes(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.fileSize(path); + } + + @Override + public boolean isReadSeekEfficient(String spec) throws IOException { + // TODO It is incorrect to return true here for files with content encoding set to gzip. + return true; + } + + @Override + public String resolve(String path, String other) throws IOException { + return toPath(path).resolve(other).toString(); + } + + @Override + public Path toPath(String path) { + return GcsPath.fromUri(path); + } + + @Override + public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { + options.getGcsUtil().copy(srcFilenames, destFilenames); + } + + @Override + public void remove(Collection<String> filesOrDirs) throws IOException { + options.getGcsUtil().remove(filesOrDirs); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java new file mode 100644 index 0000000..b4c457f --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link GcsIOChannelFactory}. + */ +@AutoService(IOChannelFactoryRegistrar.class) +public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar { + + @Override + public GcsIOChannelFactory fromOptions(PipelineOptions options) { + return GcsIOChannelFactory.fromOptions(options); + } + + @Override + public String getScheme() { + return "gs"; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java new file mode 100644 index 0000000..4d58424 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. + */ +public class GcsPathValidator implements PathValidator { + + private GcsOptions gcpOptions; + + private GcsPathValidator(GcsOptions options) { + this.gcpOptions = options; + } + + public static GcsPathValidator fromOptions(PipelineOptions options) { + return new GcsPathValidator(options.as(GcsOptions.class)); + } + + /** + * Validates the the input GCS path is accessible and that the path + * is well formed. + */ + @Override + public String validateInputFilePatternSupported(String filepattern) { + GcsPath gcsPath = getGcsPath(filepattern); + checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject())); + String returnValue = verifyPath(filepattern); + verifyPathIsAccessible(filepattern, "Could not find file %s"); + return returnValue; + } + + /** + * Validates the the output GCS path is accessible and that the path + * is well formed. + */ + @Override + public String validateOutputFilePrefixSupported(String filePrefix) { + String returnValue = verifyPath(filePrefix); + verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); + return returnValue; + } + + @Override + public String verifyPath(String path) { + GcsPath gcsPath = getGcsPath(path); + checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow"); + checkArgument(!gcsPath.getObject().isEmpty(), + "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?", + gcsPath, gcsPath.getBucket()); + checkArgument(!gcsPath.getObject().contains("//"), + "Dataflow Service does not allow objects with consecutive slashes"); + return gcsPath.toResourceName(); + } + + private void verifyPathIsAccessible(String path, String errorMessage) { + GcsPath gcsPath = getGcsPath(path); + try { + checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), + errorMessage, path); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), + e); + } + } + + private GcsPath getGcsPath(String path) { + try { + return GcsPath.fromUri(path); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "Expected a valid 'gs://' path but was given '%s'", path), e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java new file mode 100644 index 0000000..c8e6839 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.batch.json.JsonBatchCallback; +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; +import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; +import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.ClientRequestHelper; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides operations on GCS. + */ +public class GcsUtil { + /** + * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using + * any transport flags specified on the {@link PipelineOptions}. + */ + public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> { + /** + * Returns an instance of {@link GcsUtil} based on the + * {@link PipelineOptions}. + * + * <p>If no instance has previously been created, one is created and the value + * stored in {@code options}. + */ + @Override + public GcsUtil create(PipelineOptions options) { + LOG.debug("Creating new GcsUtil"); + GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtil( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + gcsOptions.getExecutorService(), + gcsOptions.getGcsUploadBufferSizeBytes()); + } + + /** + * Returns an instance of {@link GcsUtil} based on the given parameters. + */ + public static GcsUtil create( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + return new GcsUtil( + storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); + + /** Maximum number of items to retrieve per Objects.List request. */ + private static final long MAX_LIST_ITEMS_PER_CALL = 1024; + + /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ + private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*"); + + private static final String RECURSIVE_WILDCARD = "[*]{2}"; + + /** + * A {@link Pattern} for globs with a recursive wildcard. + */ + private static final Pattern RECURSIVE_GCS_PATTERN = + Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); + + /** + * Maximum number of requests permitted in a GCS batch request. + */ + private static final int MAX_REQUESTS_PER_BATCH = 100; + /** + * Maximum number of concurrent batches of requests executing on GCS. + */ + private static final int MAX_CONCURRENT_BATCHES = 256; + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + + ///////////////////////////////////////////////////////////////////////////// + + /** Client for the GCS API. */ + private Storage storageClient; + private final HttpRequestInitializer httpRequestInitializer; + /** Buffer size for GCS uploads (in bytes). */ + @Nullable private final Integer uploadBufferSizeBytes; + + // Helper delegate for turning IOExceptions from API calls into higher-level semantics. + private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + + // Exposed for testing. + final ExecutorService executorService; + + /** + * Returns true if the given GCS pattern is supported otherwise fails with an + * exception. + */ + public static boolean isGcsPatternSupported(String gcsPattern) { + if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { + throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " + + " recursive wildcards are not supported."); + } + return true; + } + + /** + * Returns the prefix portion of the glob that doesn't contain wildcards. + */ + public static String getGlobPrefix(String globExp) { + checkArgument(isGcsPatternSupported(globExp)); + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument( + m.matches(), + String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** + * Expands glob expressions to regular expressions. + * + * @param globExp the glob expression to expand + * @return a string with the regular expression this glob expands to + */ + public static String globToRegexp(String globExp) { + StringBuilder dst = new StringBuilder(); + char[] src = globExp.toCharArray(); + int i = 0; + while (i < src.length) { + char c = src[i++]; + switch (c) { + case '*': + dst.append("[^/]*"); + break; + case '?': + dst.append("[^/]"); + break; + case '.': + case '+': + case '{': + case '}': + case '(': + case ')': + case '|': + case '^': + case '$': + // These need to be escaped in regular expressions + dst.append('\\').append(c); + break; + case '\\': + i = doubleSlashes(dst, src, i); + break; + default: + dst.append(c); + break; + } + } + return dst.toString(); + } + + /** + * Returns true if the given {@code spec} contains glob. + */ + public static boolean isGlob(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } + + private GcsUtil( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + this.storageClient = storageClient; + this.httpRequestInitializer = httpRequestInitializer; + this.uploadBufferSizeBytes = uploadBufferSizeBytes; + this.executorService = executorService; + } + + // Use this only for testing purposes. + protected void setStorageClient(Storage storageClient) { + this.storageClient = storageClient; + } + + /** + * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded + * in the result. For patterns that only match a single object, we ensure that the object + * exists. + */ + public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { + checkArgument(isGcsPatternSupported(gcsPattern.getObject())); + Pattern p = null; + String prefix = null; + if (!isGlob(gcsPattern)) { + // Not a glob. + try { + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getObject(gcsPattern); + return ImmutableList.of(gcsPattern); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); + } + } else { + // Part before the first wildcard character. + prefix = getGlobPrefix(gcsPattern.getObject()); + p = Pattern.compile(globToRegexp(gcsPattern.getObject())); + } + + LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), + prefix, p.toString()); + + String pageToken = null; + List<GcsPath> results = new LinkedList<>(); + do { + Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); + if (objects.getItems() == null) { + break; + } + + // Filter objects based on the regex. + for (StorageObject o : objects.getItems()) { + String name = o.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(GcsPath.fromObject(o)); + } + } + pageToken = objects.getNextPageToken(); + } while (pageToken != null); + + return results; + } + + @VisibleForTesting + @Nullable + Integer getUploadBufferSizeBytes() { + return uploadBufferSizeBytes; + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} + * if the resource does not exist. + */ + public long fileSize(GcsPath path) throws IOException { + return getObject(path).getSize().longValue(); + } + + /** + * Returns the {@link StorageObject} for the given {@link GcsPath}. + */ + public StorageObject getObject(GcsPath gcsPath) throws IOException { + return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + } + + @VisibleForTesting + StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Objects.Get getObject = + storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); + try { + return ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getObject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { + throw new FileNotFoundException(gcsPath.toString()); + } + throw new IOException( + String.format("Unable to get the file object for path %s.", gcsPath), + e); + } + } + + /** + * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given + * {@link GcsPath GcsPaths}. + */ + public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) + throws IOException { + List<StorageObjectOrIOException[]> results = new ArrayList<>(); + executeBatches(makeGetBatches(gcsPaths, results)); + ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); + for (StorageObjectOrIOException[] result : results) { + ret.add(result[0]); + } + return ret.build(); + } + + /** + * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. + */ + public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + // List all objects that start with the prefix (including objects in sub-directories). + Storage.Objects.List listObject = storageClient.objects().list(bucket); + listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); + listObject.setPrefix(prefix); + + if (pageToken != null) { + listObject.setPageToken(pageToken); + } + + try { + return ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(listObject), + BACKOFF_FACTORY.backoff(), + RetryDeterminer.SOCKET_ERRORS, + IOException.class); + } catch (Exception e) { + throw new IOException( + String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), + e); + } + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} + * if the resource does not exist. + */ + @VisibleForTesting + List<Long> fileSizes(List<GcsPath> paths) throws IOException { + List<StorageObjectOrIOException> results = getObjects(paths); + + ImmutableList.Builder<Long> ret = ImmutableList.builder(); + for (StorageObjectOrIOException result : results) { + ret.add(toFileSize(result)); + } + return ret.build(); + } + + private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) + throws IOException { + if (storageObjectOrIOException.ioException() != null) { + throw storageObjectOrIOException.ioException(); + } else { + return storageObjectOrIOException.storageObject().getSize().longValue(); + } + } + + /** + * Opens an object in GCS. + * + * <p>Returns a SeekableByteChannel that provides access to data in the bucket. + * + * @param path the GCS filename to read from + * @return a SeekableByteChannel that can read the object data + */ + public SeekableByteChannel open(GcsPath path) + throws IOException { + return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(), + path.getObject(), errorExtractor, + new ClientRequestHelper<StorageObject>()); + } + + /** + * Creates an object in GCS. + * + * <p>Returns a WritableByteChannel that can be used to write data to the + * object. + * + * @param path the GCS file to write to + * @param type the type of object, eg "text/plain". + * @return a Callable object that encloses the operation. + */ + public WritableByteChannel create(GcsPath path, + String type) throws IOException { + GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel( + executorService, + storageClient, + new ClientRequestHelper<StorageObject>(), + path.getBucket(), + path.getObject(), + AsyncWriteChannelOptions.newBuilder().build(), + new ObjectWriteConditions(), + Collections.<String, String>emptyMap(), + type); + if (uploadBufferSizeBytes != null) { + channel.setUploadBufferSize(uploadBufferSizeBytes); + } + channel.initialize(); + return channel; + } + + /** + * Returns whether the GCS bucket exists and is accessible. + */ + public boolean bucketAccessible(GcsPath path) throws IOException { + return bucketAccessible( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number of the project which owns this bucket. + * If the bucket exists, it must be accessible otherwise the permissions + * exception will be propagated. If the bucket does not exist, an exception + * will be thrown. + */ + public long bucketOwner(GcsPath path) throws IOException { + return getBucket( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT).getProjectNumber().longValue(); + } + + /** + * Creates a {@link Bucket} under the specified project in Cloud Storage or + * propagates an exception. + */ + public void createBucket(String projectId, Bucket bucket) throws IOException { + createBucket( + projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + } + + /** + * Returns whether the GCS bucket exists. This will return false if the bucket + * is inaccessible due to permissions. + */ + @VisibleForTesting + boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + try { + return getBucket(path, backoff, sleeper) != null; + } catch (AccessDeniedException | FileNotFoundException e) { + return false; + } + } + + @VisibleForTesting + @Nullable + Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Get getBucket = + storageClient.buckets().get(path.getBucket()); + + try { + Bucket bucket = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getBucket), + backoff, + new RetryDeterminer<IOException>() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + + return bucket; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(path.toString(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to verify existence of bucket gs://%s", + path.getBucket()), e); + } + } + + @VisibleForTesting + void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) + throws IOException { + Storage.Buckets.Insert insertBucket = + storageClient.buckets().insert(projectId, bucket); + insertBucket.setPredefinedAcl("projectPrivate"); + insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); + + try { + ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(insertBucket), + backoff, + new RetryDeterminer<IOException>() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + return; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemAlreadyExists(e)) { + throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to create bucket gs://%s for rproject %s", + bucket.getName(), projectId), e); + } + } + + private static void executeBatches(List<BatchRequest> batches) throws IOException { + ListeningExecutorService executor = MoreExecutors.listeningDecorator( + MoreExecutors.getExitingExecutorService( + new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()))); + + List<ListenableFuture<Void>> futures = new LinkedList<>(); + for (final BatchRequest batch : batches) { + futures.add(executor.submit(new Callable<Void>() { + public Void call() throws IOException { + batch.execute(); + return null; + } + })); + } + + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while executing batch GCS request", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof FileNotFoundException) { + throw (FileNotFoundException) e.getCause(); + } + throw new IOException("Error executing batch GCS request", e); + } finally { + executor.shutdown(); + } + } + + /** + * Makes get {@link BatchRequest BatchRequests}. + * + * @param paths {@link GcsPath GcsPaths}. + * @param results mutable {@link List} for return values. + * @return {@link BatchRequest BatchRequests} to execute. + * @throws IOException + */ + @VisibleForTesting + List<BatchRequest> makeGetBatches( + Collection<GcsPath> paths, + List<StorageObjectOrIOException[]> results) throws IOException { + List<BatchRequest> batches = new LinkedList<>(); + for (List<GcsPath> filesToGet : + Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { + BatchRequest batch = createBatchRequest(); + for (GcsPath path : filesToGet) { + results.add(enqueueGetFileSize(path, batch)); + } + batches.add(batch); + } + return batches; + } + + public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { + executeBatches(makeCopyBatches(srcFilenames, destFilenames)); + } + + List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { + List<String> srcList = Lists.newArrayList(srcFilenames); + List<String> destList = Lists.newArrayList(destFilenames); + checkArgument( + srcList.size() == destList.size(), + "Number of source files %s must equal number of destination files %s", + srcList.size(), + destList.size()); + + List<BatchRequest> batches = new LinkedList<>(); + BatchRequest batch = createBatchRequest(); + for (int i = 0; i < srcList.size(); i++) { + final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); + final GcsPath destPath = GcsPath.fromUri(destList.get(i)); + enqueueCopy(sourcePath, destPath, batch); + if (batch.size() >= MAX_REQUESTS_PER_BATCH) { + batches.add(batch); + batch = createBatchRequest(); + } + } + if (batch.size() > 0) { + batches.add(batch); + } + return batches; + } + + List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException { + List<BatchRequest> batches = new LinkedList<>(); + for (List<String> filesToDelete : + Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { + BatchRequest batch = createBatchRequest(); + for (String file : filesToDelete) { + enqueueDelete(GcsPath.fromUri(file), batch); + } + batches.add(batch); + } + return batches; + } + + public void remove(Collection<String> filenames) throws IOException { + executeBatches(makeRemoveBatches(filenames)); + } + + private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) + throws IOException { + final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; + + Storage.Objects.Get getRequest = storageClient.objects() + .get(path.getBucket(), path.getObject()); + getRequest.queue(batch, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { + ret[0] = StorageObjectOrIOException.create(response); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { + IOException ioException; + if (errorExtractor.itemNotFound(e)) { + ioException = new FileNotFoundException(path.toString()); + } else { + ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); + } + ret[0] = StorageObjectOrIOException.create(ioException); + } + }); + return ret; + } + + /** + * A class that holds either a {@link StorageObject} or an {@link IOException}. + */ + @AutoValue + public abstract static class StorageObjectOrIOException { + + /** + * Returns the {@link StorageObject}. + */ + @Nullable + public abstract StorageObject storageObject(); + + /** + * Returns the {@link IOException}. + */ + @Nullable + public abstract IOException ioException(); + + @VisibleForTesting + public static StorageObjectOrIOException create(StorageObject storageObject) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + checkNotNull(storageObject, "storageObject"), + null /* ioException */); + } + + @VisibleForTesting + public static StorageObjectOrIOException create(IOException ioException) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + null /* storageObject */, + checkNotNull(ioException, "ioException")); + } + } + + private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) + throws IOException { + Storage.Objects.Copy copyRequest = storageClient.objects() + .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); + copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully copied {} to {}", from, to); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + throw new IOException( + String.format("Error trying to copy %s to %s: %s", from, to, e)); + } + }); + } + + private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { + Storage.Objects.Delete deleteRequest = storageClient.objects() + .delete(file.getBucket(), file.getObject()); + deleteRequest.queue(batch, new JsonBatchCallback<Void>() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {}", file); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + throw new IOException(String.format("Error trying to delete %s: %s", file, e)); + } + }); + } + + private BatchRequest createBatchRequest() { + return storageClient.batch(httpRequestInitializer); + } + + private static int doubleSlashes(StringBuilder dst, char[] src, int i) { + // Emit the next character without special interpretation + dst.append('\\'); + if ((i - 1) != src.length) { + dst.append(src[i]); + i++; + } else { + // A backslash at the very end is treated like an escaped backslash + dst.append('\\'); + } + return i; + } +}
