http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java new file mode 100644 index 0000000..2187e7d --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.AppEngineEnvironment; +import org.apache.beam.sdk.util.GcsPathValidator; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; + +/** + * Options used to configure Google Cloud Storage. + */ +public interface GcsOptions extends + ApplicationNameOptions, GcpOptions, PipelineOptions { + /** + * The GcsUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") + @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class) + @Hidden + GcsUtil getGcsUtil(); + void setGcsUtil(GcsUtil value); + + /** + * The ExecutorService instance to use to create threads, can be overridden to specify an + * ExecutorService that is compatible with the users environment. If unset, the + * default is to create an ExecutorService with an unbounded number of threads; this + * is compatible with Google AppEngine. + */ + @JsonIgnore + @Description("The ExecutorService instance to use to create multiple threads. Can be overridden " + + "to specify an ExecutorService that is compatible with the users environment. If unset, " + + "the default is to create an ExecutorService with an unbounded number of threads; this " + + "is compatible with Google AppEngine.") + @Default.InstanceFactory(ExecutorServiceFactory.class) + @Hidden + ExecutorService getExecutorService(); + void setExecutorService(ExecutorService value); + + /** + * GCS endpoint to use. If unspecified, uses the default endpoint. + */ + @JsonIgnore + @Hidden + @Description("The URL for the GCS API.") + String getGcsEndpoint(); + void setGcsEndpoint(String value); + + /** + * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for + * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the + * restrictions and performance implications of this value. + */ + @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the " + + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " + + "information on the restrictions and performance implications of this value.\n\n" + + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + @Nullable + Integer getGcsUploadBufferSizeBytes(); + void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); + + /** + * The class of the validator that should be created and used to validate paths. + * If pathValidator has not been set explicitly, an instance of this class will be + * constructed and used as the path validator. + */ + @Description("The class of the validator that should be created and used to validate paths. " + + "If pathValidator has not been set explicitly, an instance of this class will be " + + "constructed and used as the path validator.") + @Default.Class(GcsPathValidator.class) + Class<? extends PathValidator> getPathValidatorClass(); + void setPathValidatorClass(Class<? extends PathValidator> validatorClass); + + /** + * The path validator instance that should be used to validate paths. + * If no path validator has been set explicitly, the default is to use the instance factory that + * constructs a path validator based upon the currently set pathValidatorClass. + */ + @JsonIgnore + @Description("The path validator instance that should be used to validate paths. " + + "If no path validator has been set explicitly, the default is to use the instance factory " + + "that constructs a path validator based upon the currently set pathValidatorClass.") + @Default.InstanceFactory(PathValidatorFactory.class) + PathValidator getPathValidator(); + void setPathValidator(PathValidator validator); + + /** + * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The + * {@link ExecutorService} is compatible with AppEngine. + */ + class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { + @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. + @Override + public ExecutorService create(PipelineOptions options) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + if (!AppEngineEnvironment.IS_APP_ENGINE) { + // AppEngine doesn't allow modification of threads to be daemon threads. + threadFactoryBuilder.setDaemon(true); + } + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<Runnable>(), + threadFactoryBuilder.build()); + } + } + + /** + * Creates a {@link PathValidator} object using the class specified in + * {@link #getPathValidatorClass()}. + */ + class PathValidatorFactory implements DefaultValueFactory<PathValidator> { + @Override + public PathValidator create(PipelineOptions options) { + GcsOptions gcsOptions = options.as(GcsOptions.class); + return InstanceBuilder.ofType(PathValidator.class) + .fromClass(gcsOptions.getPathValidatorClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java new file mode 100644 index 0000000..f9cb575 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.google.api.client.googleapis.services.AbstractGoogleClient; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.googleapis.services.GoogleClientRequestInitializer; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * These options configure debug settings for Google API clients created within the Apache Beam SDK. + */ +public interface GoogleApiDebugOptions extends PipelineOptions { + /** + * This option enables tracing of API calls to Google services used within the Apache + * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...} + * </code> where the {@code ApiName} represents the request classes canonical name. The + * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported. + * Typically, "producer" is the right destination to use: this makes API traces available to the + * team offering the API. Note that by enabling this option, the contents of the requests to and + * from Google Cloud services will be made available to Google. For example, by specifying + * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available + * to Google, specifically to the Google Cloud Dataflow team. + */ + @Description("This option enables tracing of API calls to Google services used within the Apache " + + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} " + + "where the ApiName represents the request classes canonical name. The TraceDestination is " + + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is " + + "the right destination to use: this makes API traces available to the team offering the " + + "API. Note that by enabling this option, the contents of the requests to and from " + + "Google Cloud services will be made available to Google. For example, by specifying " + + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to " + + "Google, specifically to the Google Cloud Dataflow team.") + GoogleApiTracer getGoogleApiTrace(); + void setGoogleApiTrace(GoogleApiTracer commands); + + /** + * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls. + */ + class GoogleApiTracer extends HashMap<String, String> + implements GoogleClientRequestInitializer { + /** + * Creates a {@link GoogleApiTracer} that sets the trace destination on all + * calls that match the given client type. + */ + public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) { + put(client.getClass().getCanonicalName(), traceDestination); + return this; + } + + /** + * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all + * calls that match for the given request type. + */ + public GoogleApiTracer addTraceFor( + AbstractGoogleClientRequest<?> request, String traceDestination) { + put(request.getClass().getCanonicalName(), traceDestination); + return this; + } + + @Override + public void initialize(AbstractGoogleClientRequest<?> request) throws IOException { + for (Map.Entry<String, String> entry : this.entrySet()) { + if (request.getClass().getCanonicalName().contains(entry.getKey())) { + request.set("$trace", entry.getValue()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java new file mode 100644 index 0000000..b065d19 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +/** + * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK. + */ +@Description("Options that are used to configure Google Cloud Pub/Sub. See " + + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.") +public interface PubsubOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + + /** + * Root URL for use with the Google Cloud Pub/Sub API. + */ + @Description("Root URL for use with the Google Cloud Pub/Sub API") + @Default.String("https://pubsub.googleapis.com") + @Hidden + String getPubsubRootUrl(); + void setPubsubRootUrl(String value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java new file mode 100644 index 0000000..465e742 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for + * configuring pipeline execution for Google Cloud Platform components. + */ +package org.apache.beam.sdk.options; http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java new file mode 100644 index 0000000..8f752c0 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Transport; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A matcher to verify data in BigQuery by processing given query + * and comparing with content's checksum. + * + * <p>Example: + * <pre>{@code [ + * assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum)); + * ]}</pre> + */ +@NotThreadSafe +public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> + implements SerializableMatcher<PipelineResult> { + private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class); + + // The maximum number of retries to execute a BigQuery RPC + static final int MAX_QUERY_RETRIES = 4; + + // The initial backoff for executing a BigQuery RPC + private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L); + + // The total number of rows in query response to be formatted for debugging purpose + private static final int TOTAL_FORMATTED_ROWS = 20; + + // The backoff factory with initial configs + static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_QUERY_RETRIES) + .withInitialBackoff(INITIAL_BACKOFF); + + private final String applicationName; + private final String projectId; + private final String query; + private final String expectedChecksum; + private String actualChecksum; + private transient QueryResponse response; + + public BigqueryMatcher( + String applicationName, String projectId, String query, String expectedChecksum) { + validateArgument("applicationName", applicationName); + validateArgument("projectId", projectId); + validateArgument("query", query); + validateArgument("expectedChecksum", expectedChecksum); + + this.applicationName = applicationName; + this.projectId = projectId; + this.query = query; + this.expectedChecksum = expectedChecksum; + } + + @Override + protected boolean matchesSafely(PipelineResult pipelineResult) { + LOG.info("Verifying Bigquery data"); + Bigquery bigqueryClient = newBigqueryClient(applicationName); + + // execute query + LOG.debug("Executing query: {}", query); + try { + QueryRequest queryContent = new QueryRequest(); + queryContent.setQuery(query); + + response = queryWithRetries( + bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedIOException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException("Failed to fetch BigQuery data.", e); + } + + if (!response.getJobComplete()) { + // query job not complete, verification failed + return false; + } else { + // compute checksum + actualChecksum = generateHash(response.getRows()); + LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); + + return expectedChecksum.equals(actualChecksum); + } + } + + @VisibleForTesting + Bigquery newBigqueryClient(String applicationName) { + HttpTransport transport = Transport.getTransport(); + JsonFactory jsonFactory = Transport.getJsonFactory(); + Credentials credential = getDefaultCredential(); + + return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential)) + .setApplicationName(applicationName) + .build(); + } + + @Nonnull + @VisibleForTesting + QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent, + Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOException lastException = null; + do { + if (lastException != null) { + LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException); + } + try { + QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute(); + if (response != null) { + return response; + } else { + lastException = + new IOException("Expected valid response from query job, but received null."); + } + } catch (IOException e) { + // ignore and retry + lastException = e; + } + } while(BackOffUtils.next(sleeper, backOff)); + + throw new RuntimeException( + String.format( + "Unable to get BigQuery response after retrying %d times using query (%s)", + MAX_QUERY_RETRIES, + queryContent.getQuery()), + lastException); + } + + private void validateArgument(String name, String value) { + checkArgument( + !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value); + } + + private Credentials getDefaultCredential() { + GoogleCredentials credential; + try { + credential = GoogleCredentials.getApplicationDefault(); + } catch (IOException e) { + throw new RuntimeException("Failed to get application default credential.", e); + } + + if (credential.createScopedRequired()) { + Collection<String> bigqueryScope = + Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY); + credential = credential.createScoped(bigqueryScope); + } + return credential; + } + + private String generateHash(@Nonnull List<TableRow> rows) { + List<HashCode> rowHashes = Lists.newArrayList(); + for (TableRow row : rows) { + List<String> cellsInOneRow = Lists.newArrayList(); + for (TableCell cell : row.getF()) { + cellsInOneRow.add(Objects.toString(cell.getV())); + Collections.sort(cellsInOneRow); + } + rowHashes.add( + Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(rowHashes).toString(); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Expected checksum is (") + .appendText(expectedChecksum) + .appendText(")"); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + String info; + if (!response.getJobComplete()) { + // query job not complete + info = String.format("The query job hasn't completed. Got response: %s", response); + } else { + // checksum mismatch + info = String.format("was (%s).%n" + + "\tTotal number of rows are: %d.%n" + + "\tQueried data details:%s", + actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS)); + } + description.appendText(info); + } + + private String formatRows(int totalNumRows) { + StringBuilder samples = new StringBuilder(); + List<TableRow> rows = response.getRows(); + for (int i = 0; i < totalNumRows && i < rows.size(); i++) { + samples.append(String.format("%n\t\t")); + for (TableCell field : rows.get(i).getF()) { + samples.append(String.format("%-10s", field.getV())); + } + } + if (rows.size() > totalNumRows) { + samples.append(String.format("%n\t\t...")); + } + return samples.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java new file mode 100644 index 0000000..1494026 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/testing/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines utilities for unit testing Google Cloud Platform components of Apache Beam pipelines. + */ +package org.apache.beam.sdk.testing; http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java new file mode 100644 index 0000000..b0fcbd1 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.lang.reflect.InvocationTargetException; + +/** Stores whether we are running within AppEngine or not. */ +public class AppEngineEnvironment { + /** + * True if running inside of AppEngine, false otherwise. + */ + @Deprecated + public static final boolean IS_APP_ENGINE = isAppEngine(); + + /** + * Attempts to detect whether we are inside of AppEngine. + * + * <p>Purposely copied and left private from private <a href="https://code.google.com/p/ + * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/ + * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>. + * + * @return true if we are inside of AppEngine, false otherwise. + */ + static boolean isAppEngine() { + if (System.getProperty("com.google.appengine.runtime.environment") == null) { + return false; + } + try { + // If the current environment is null, we're not inside AppEngine. + return Class.forName("com.google.apphosting.api.ApiProxy") + .getMethod("getCurrentEnvironment") + .invoke(null) != null; + } catch (ClassNotFoundException e) { + // If ApiProxy doesn't exist, we're not on AppEngine at all. + return false; + } catch (InvocationTargetException e) { + // If ApiProxy throws an exception, we're not in a proper AppEngine environment. + return false; + } catch (IllegalAccessException e) { + // If the method isn't accessible, we're not on a supported version of AppEngine; + return false; + } catch (NoSuchMethodException e) { + // If the method doesn't exist, we're not on a supported version of AppEngine; + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java new file mode 100644 index 0000000..6229650 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.security.GeneralSecurityException; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + */ +public interface CredentialFactory { + Credentials getCredential() throws IOException, GeneralSecurityException; +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java new file mode 100644 index 0000000..75954c0 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.google.api.services.storage.model.Bucket; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for handling default GCS buckets. + */ +public class DefaultBucket { + static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); + + static final String DEFAULT_REGION = "us-central1"; + + /** + * Creates a default bucket or verifies the existence and proper access control + * of an existing default bucket. Returns the location if successful. + */ + public static String tryCreateDefaultBucket(PipelineOptions options) { + GcsOptions gcpOptions = options.as(GcsOptions.class); + + final String projectId = gcpOptions.getProject(); + checkArgument(!isNullOrEmpty(projectId), + "--project is a required option."); + + // Look up the project number, to create a default bucket with a stable + // name with no special characters. + long projectNumber = 0L; + try { + projectNumber = gcpOptions.as(CloudResourceManagerOptions.class) + .getGcpProjectUtil().getProjectNumber(projectId); + } catch (IOException e) { + throw new RuntimeException("Unable to verify project with ID " + projectId, e); + } + String region = DEFAULT_REGION; + if (!isNullOrEmpty(gcpOptions.getZone())) { + region = getRegionFromZone(gcpOptions.getZone()); + } + final String bucketName = + "dataflow-staging-" + region + "-" + projectNumber; + LOG.info("No staging location provided, attempting to use default bucket: {}", + bucketName); + Bucket bucket = new Bucket() + .setName(bucketName) + .setLocation(region); + // Always try to create the bucket before checking access, so that we do not + // race with other pipelines that may be attempting to do the same thing. + try { + gcpOptions.getGcsUtil().createBucket(projectId, bucket); + } catch (FileAlreadyExistsException e) { + LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); + } catch (IOException e) { + throw new RuntimeException("Unable create default bucket.", e); + } + + // Once the bucket is expected to exist, verify that it is correctly owned + // by the project executing the job. + try { + long owner = gcpOptions.getGcsUtil().bucketOwner( + GcsPath.fromComponents(bucketName, "")); + checkArgument( + owner == projectNumber, + "Bucket owner does not match the project from --project:" + + " %s vs. %s", owner, projectNumber); + } catch (IOException e) { + throw new RuntimeException( + "Unable to determine the owner of the default bucket at gs://" + bucketName, e); + } + return "gs://" + bucketName; + } + + @VisibleForTesting + static String getRegionFromZone(String zone) { + String[] zoneParts = zone.split("-"); + checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); + return zoneParts[0] + "-" + zoneParts[1]; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java new file mode 100644 index 0000000..e1fa18f --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Returns a GCP credential. + */ +public class GcpCredentialFactory implements CredentialFactory { + /** + * The scope cloud-platform provides access to all Cloud Platform resources. + * cloud-platform isn't sufficient yet for talking to datastore so we request + * those resources separately. + * + * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for + * services we access directly (GCS) as opposed to through the backend + * (BigQuery, GCE), we need to explicitly request that scope. + */ + private static final List<String> SCOPES = Arrays.asList( + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/datastore", + "https://www.googleapis.com/auth/pubsub"); + + private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); + + public static GcpCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + /** + * Returns a default GCP {@link Credentials} or null when it fails. + */ + @Override + public Credentials getCredential() { + try { + return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + } catch (IOException e) { + // Ignore the exception + // Pipelines that only access to public data should be able to run without credentials. + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java new file mode 100644 index 0000000..f73afe0 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides operations on Google Cloud Platform Projects. + */ +public class GcpProjectUtil { + /** + * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using + * any transport flags specified on the {@link PipelineOptions}. + */ + public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> { + /** + * Returns an instance of {@link GcpProjectUtil} based on the + * {@link PipelineOptions}. + */ + @Override + public GcpProjectUtil create(PipelineOptions options) { + LOG.debug("Creating new GcpProjectUtil"); + CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class); + return new GcpProjectUtil( + Transport.newCloudResourceManagerClient(crmOptions).build()); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class); + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + + /** Client for the CRM API. */ + private CloudResourceManager crmClient; + + private GcpProjectUtil(CloudResourceManager crmClient) { + this.crmClient = crmClient; + } + + // Use this only for testing purposes. + @VisibleForTesting + void setCrmClient(CloudResourceManager crmClient) { + this.crmClient = crmClient; + } + + /** + * Returns the project number or throws an exception if the project does not + * exist or has other access exceptions. + */ + public long getProjectNumber(String projectId) throws IOException { + return getProjectNumber( + projectId, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number or throws an error if the project does not + * exist or has other access errors. + */ + @VisibleForTesting + long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException { + CloudResourceManager.Projects.Get getProject = + crmClient.projects().get(projectId); + try { + Project project = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getProject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + return project.getProjectNumber(); + } catch (Exception e) { + throw new IOException("Unable to get project number", e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java new file mode 100644 index 0000000..745dcb9 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * Implements IOChannelFactory for GCS. + */ +public class GcsIOChannelFactory implements IOChannelFactory { + + /** + * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}. + */ + public static GcsIOChannelFactory fromOptions(PipelineOptions options) { + return new GcsIOChannelFactory(options.as(GcsOptions.class)); + } + + private final GcsOptions options; + + private GcsIOChannelFactory(GcsOptions options) { + this.options = options; + } + + @Override + public Collection<String> match(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + List<GcsPath> matched = util.expand(path); + + List<String> specs = new LinkedList<>(); + for (GcsPath match : matched) { + specs.add(match.toString()); + } + + return specs; + } + + @Override + public ReadableByteChannel open(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.open(path); + } + + @Override + public WritableByteChannel create(String spec, String mimeType) + throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.create(path, mimeType); + } + + @Override + public long getSizeBytes(String spec) throws IOException { + GcsPath path = GcsPath.fromUri(spec); + GcsUtil util = options.getGcsUtil(); + return util.fileSize(path); + } + + @Override + public boolean isReadSeekEfficient(String spec) throws IOException { + // TODO It is incorrect to return true here for files with content encoding set to gzip. + return true; + } + + @Override + public String resolve(String path, String other) throws IOException { + return toPath(path).resolve(other).toString(); + } + + @Override + public Path toPath(String path) { + return GcsPath.fromUri(path); + } + + @Override + public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { + options.getGcsUtil().copy(srcFilenames, destFilenames); + } + + @Override + public void remove(Collection<String> filesOrDirs) throws IOException { + options.getGcsUtil().remove(filesOrDirs); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java new file mode 100644 index 0000000..b4c457f --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link GcsIOChannelFactory}. + */ +@AutoService(IOChannelFactoryRegistrar.class) +public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar { + + @Override + public GcsIOChannelFactory fromOptions(PipelineOptions options) { + return GcsIOChannelFactory.fromOptions(options); + } + + @Override + public String getScheme() { + return "gs"; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java new file mode 100644 index 0000000..a5b951d --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. + */ +public class GcsPathValidator implements PathValidator { + + private GcsOptions gcpOptions; + + private GcsPathValidator(GcsOptions options) { + this.gcpOptions = options; + } + + public static GcsPathValidator fromOptions(PipelineOptions options) { + return new GcsPathValidator(options.as(GcsOptions.class)); + } + + /** + * Validates the the input GCS path is accessible and that the path + * is well formed. + */ + @Override + public String validateInputFilePatternSupported(String filepattern) { + GcsPath gcsPath = getGcsPath(filepattern); + checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject())); + String returnValue = verifyPath(filepattern); + verifyPathIsAccessible(filepattern, "Could not find file %s"); + return returnValue; + } + + /** + * Validates the the output GCS path is accessible and that the path + * is well formed. + */ + @Override + public String validateOutputFilePrefixSupported(String filePrefix) { + String returnValue = verifyPath(filePrefix); + verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); + return returnValue; + } + + @Override + public String verifyPath(String path) { + GcsPath gcsPath = getGcsPath(path); + checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow"); + checkArgument(!gcsPath.getObject().contains("//"), + "Dataflow Service does not allow objects with consecutive slashes"); + return gcsPath.toResourceName(); + } + + private void verifyPathIsAccessible(String path, String errorMessage) { + GcsPath gcsPath = getGcsPath(path); + try { + checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), + errorMessage, path); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), + e); + } + } + + private GcsPath getGcsPath(String path) { + try { + return GcsPath.fromUri(path); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "Expected a valid 'gs://' path but was given '%s'", path), e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java new file mode 100644 index 0000000..1c853bb --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -0,0 +1,798 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.batch.json.JsonBatchCallback; +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; +import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; +import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.ClientRequestHelper; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides operations on GCS. + */ +public class GcsUtil { + /** + * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using + * any transport flags specified on the {@link PipelineOptions}. + */ + public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> { + /** + * Returns an instance of {@link GcsUtil} based on the + * {@link PipelineOptions}. + * + * <p>If no instance has previously been created, one is created and the value + * stored in {@code options}. + */ + @Override + public GcsUtil create(PipelineOptions options) { + LOG.debug("Creating new GcsUtil"); + GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); + return new GcsUtil( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + gcsOptions.getExecutorService(), + gcsOptions.getGcsUploadBufferSizeBytes()); + } + + /** + * Returns an instance of {@link GcsUtil} based on the given parameters. + */ + public static GcsUtil create( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + return new GcsUtil( + storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); + + /** Maximum number of items to retrieve per Objects.List request. */ + private static final long MAX_LIST_ITEMS_PER_CALL = 1024; + + /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ + private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*"); + + private static final String RECURSIVE_WILDCARD = "[*]{2}"; + + /** + * A {@link Pattern} for globs with a recursive wildcard. + */ + private static final Pattern RECURSIVE_GCS_PATTERN = + Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); + + /** + * Maximum number of requests permitted in a GCS batch request. + */ + private static final int MAX_REQUESTS_PER_BATCH = 100; + /** + * Maximum number of concurrent batches of requests executing on GCS. + */ + private static final int MAX_CONCURRENT_BATCHES = 256; + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + + ///////////////////////////////////////////////////////////////////////////// + + /** Client for the GCS API. */ + private Storage storageClient; + private final HttpRequestInitializer httpRequestInitializer; + /** Buffer size for GCS uploads (in bytes). */ + @Nullable private final Integer uploadBufferSizeBytes; + + // Helper delegate for turning IOExceptions from API calls into higher-level semantics. + private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + + // Exposed for testing. + final ExecutorService executorService; + + /** + * Returns true if the given GCS pattern is supported otherwise fails with an + * exception. + */ + public static boolean isGcsPatternSupported(String gcsPattern) { + if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { + throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " + + " recursive wildcards are not supported."); + } + return true; + } + + /** + * Returns the prefix portion of the glob that doesn't contain wildcards. + */ + public static String getGlobPrefix(String globExp) { + checkArgument(isGcsPatternSupported(globExp)); + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument( + m.matches(), + String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** + * Expands glob expressions to regular expressions. + * + * @param globExp the glob expression to expand + * @return a string with the regular expression this glob expands to + */ + public static String globToRegexp(String globExp) { + StringBuilder dst = new StringBuilder(); + char[] src = globExp.toCharArray(); + int i = 0; + while (i < src.length) { + char c = src[i++]; + switch (c) { + case '*': + dst.append("[^/]*"); + break; + case '?': + dst.append("[^/]"); + break; + case '.': + case '+': + case '{': + case '}': + case '(': + case ')': + case '|': + case '^': + case '$': + // These need to be escaped in regular expressions + dst.append('\\').append(c); + break; + case '\\': + i = doubleSlashes(dst, src, i); + break; + default: + dst.append(c); + break; + } + } + return dst.toString(); + } + + /** + * Returns true if the given {@code spec} contains glob. + */ + public static boolean isGlob(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } + + private GcsUtil( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + this.storageClient = storageClient; + this.httpRequestInitializer = httpRequestInitializer; + this.uploadBufferSizeBytes = uploadBufferSizeBytes; + this.executorService = executorService; + } + + // Use this only for testing purposes. + protected void setStorageClient(Storage storageClient) { + this.storageClient = storageClient; + } + + /** + * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded + * in the result. For patterns that only match a single object, we ensure that the object + * exists. + */ + public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { + checkArgument(isGcsPatternSupported(gcsPattern.getObject())); + Pattern p = null; + String prefix = null; + if (!isGlob(gcsPattern)) { + // Not a glob. + try { + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getObject(gcsPattern); + return ImmutableList.of(gcsPattern); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); + } + } else { + // Part before the first wildcard character. + prefix = getGlobPrefix(gcsPattern.getObject()); + p = Pattern.compile(globToRegexp(gcsPattern.getObject())); + } + + LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), + prefix, p.toString()); + + String pageToken = null; + List<GcsPath> results = new LinkedList<>(); + do { + Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); + if (objects.getItems() == null) { + break; + } + + // Filter objects based on the regex. + for (StorageObject o : objects.getItems()) { + String name = o.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(GcsPath.fromObject(o)); + } + } + pageToken = objects.getNextPageToken(); + } while (pageToken != null); + + return results; + } + + @VisibleForTesting + @Nullable + Integer getUploadBufferSizeBytes() { + return uploadBufferSizeBytes; + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} + * if the resource does not exist. + */ + public long fileSize(GcsPath path) throws IOException { + return getObject(path).getSize().longValue(); + } + + /** + * Returns the {@link StorageObject} for the given {@link GcsPath}. + */ + public StorageObject getObject(GcsPath gcsPath) throws IOException { + return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + } + + @VisibleForTesting + StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Objects.Get getObject = + storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); + try { + return ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getObject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { + throw new FileNotFoundException(gcsPath.toString()); + } + throw new IOException( + String.format("Unable to get the file object for path %s.", gcsPath), + e); + } + } + + /** + * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given + * {@link GcsPath GcsPaths}. + */ + public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) + throws IOException { + List<StorageObjectOrIOException[]> results = new ArrayList<>(); + executeBatches(makeGetBatches(gcsPaths, results)); + ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); + for (StorageObjectOrIOException[] result : results) { + ret.add(result[0]); + } + return ret.build(); + } + + /** + * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. + */ + public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + // List all objects that start with the prefix (including objects in sub-directories). + Storage.Objects.List listObject = storageClient.objects().list(bucket); + listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); + listObject.setPrefix(prefix); + + if (pageToken != null) { + listObject.setPageToken(pageToken); + } + + try { + return ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(listObject), + BACKOFF_FACTORY.backoff(), + RetryDeterminer.SOCKET_ERRORS, + IOException.class); + } catch (Exception e) { + throw new IOException( + String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), + e); + } + } + + /** + * Returns the file size from GCS or throws {@link FileNotFoundException} + * if the resource does not exist. + */ + @VisibleForTesting + List<Long> fileSizes(List<GcsPath> paths) throws IOException { + List<StorageObjectOrIOException> results = getObjects(paths); + + ImmutableList.Builder<Long> ret = ImmutableList.builder(); + for (StorageObjectOrIOException result : results) { + ret.add(toFileSize(result)); + } + return ret.build(); + } + + private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) + throws IOException { + if (storageObjectOrIOException.ioException() != null) { + throw storageObjectOrIOException.ioException(); + } else { + return storageObjectOrIOException.storageObject().getSize().longValue(); + } + } + + /** + * Opens an object in GCS. + * + * <p>Returns a SeekableByteChannel that provides access to data in the bucket. + * + * @param path the GCS filename to read from + * @return a SeekableByteChannel that can read the object data + */ + public SeekableByteChannel open(GcsPath path) + throws IOException { + return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(), + path.getObject(), errorExtractor, + new ClientRequestHelper<StorageObject>()); + } + + /** + * Creates an object in GCS. + * + * <p>Returns a WritableByteChannel that can be used to write data to the + * object. + * + * @param path the GCS file to write to + * @param type the type of object, eg "text/plain". + * @return a Callable object that encloses the operation. + */ + public WritableByteChannel create(GcsPath path, + String type) throws IOException { + GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel( + executorService, + storageClient, + new ClientRequestHelper<StorageObject>(), + path.getBucket(), + path.getObject(), + AsyncWriteChannelOptions.newBuilder().build(), + new ObjectWriteConditions(), + Collections.<String, String>emptyMap(), + type); + if (uploadBufferSizeBytes != null) { + channel.setUploadBufferSize(uploadBufferSizeBytes); + } + channel.initialize(); + return channel; + } + + /** + * Returns whether the GCS bucket exists and is accessible. + */ + public boolean bucketAccessible(GcsPath path) throws IOException { + return bucketAccessible( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number of the project which owns this bucket. + * If the bucket exists, it must be accessible otherwise the permissions + * exception will be propagated. If the bucket does not exist, an exception + * will be thrown. + */ + public long bucketOwner(GcsPath path) throws IOException { + return getBucket( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT).getProjectNumber().longValue(); + } + + /** + * Creates a {@link Bucket} under the specified project in Cloud Storage or + * propagates an exception. + */ + public void createBucket(String projectId, Bucket bucket) throws IOException { + createBucket( + projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + } + + /** + * Returns whether the GCS bucket exists. This will return false if the bucket + * is inaccessible due to permissions. + */ + @VisibleForTesting + boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + try { + return getBucket(path, backoff, sleeper) != null; + } catch (AccessDeniedException | FileNotFoundException e) { + return false; + } + } + + @VisibleForTesting + @Nullable + Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Get getBucket = + storageClient.buckets().get(path.getBucket()); + + try { + Bucket bucket = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getBucket), + backoff, + new RetryDeterminer<IOException>() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + + return bucket; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(path.toString(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to verify existence of bucket gs://%s", + path.getBucket()), e); + } + } + + @VisibleForTesting + void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) + throws IOException { + Storage.Buckets.Insert insertBucket = + storageClient.buckets().insert(projectId, bucket); + insertBucket.setPredefinedAcl("projectPrivate"); + insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); + + try { + ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(insertBucket), + backoff, + new RetryDeterminer<IOException>() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + return; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemAlreadyExists(e)) { + throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to create bucket gs://%s for rproject %s", + bucket.getName(), projectId), e); + } + } + + private static void executeBatches(List<BatchRequest> batches) throws IOException { + ListeningExecutorService executor = MoreExecutors.listeningDecorator( + MoreExecutors.getExitingExecutorService( + new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()))); + + List<ListenableFuture<Void>> futures = new LinkedList<>(); + for (final BatchRequest batch : batches) { + futures.add(executor.submit(new Callable<Void>() { + public Void call() throws IOException { + batch.execute(); + return null; + } + })); + } + + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while executing batch GCS request", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof FileNotFoundException) { + throw (FileNotFoundException) e.getCause(); + } + throw new IOException("Error executing batch GCS request", e); + } finally { + executor.shutdown(); + } + } + + /** + * Makes get {@link BatchRequest BatchRequests}. + * + * @param paths {@link GcsPath GcsPaths}. + * @param results mutable {@link List} for return values. + * @return {@link BatchRequest BatchRequests} to execute. + * @throws IOException + */ + @VisibleForTesting + List<BatchRequest> makeGetBatches( + Collection<GcsPath> paths, + List<StorageObjectOrIOException[]> results) throws IOException { + List<BatchRequest> batches = new LinkedList<>(); + for (List<GcsPath> filesToGet : + Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { + BatchRequest batch = createBatchRequest(); + for (GcsPath path : filesToGet) { + results.add(enqueueGetFileSize(path, batch)); + } + batches.add(batch); + } + return batches; + } + + public void copy(Iterable<String> srcFilenames, + Iterable<String> destFilenames) throws + IOException { + executeBatches(makeCopyBatches(srcFilenames, destFilenames)); + } + + List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames) + throws IOException { + List<String> srcList = Lists.newArrayList(srcFilenames); + List<String> destList = Lists.newArrayList(destFilenames); + checkArgument( + srcList.size() == destList.size(), + "Number of source files %s must equal number of destination files %s", + srcList.size(), + destList.size()); + + List<BatchRequest> batches = new LinkedList<>(); + BatchRequest batch = createBatchRequest(); + for (int i = 0; i < srcList.size(); i++) { + final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); + final GcsPath destPath = GcsPath.fromUri(destList.get(i)); + enqueueCopy(sourcePath, destPath, batch); + if (batch.size() >= MAX_REQUESTS_PER_BATCH) { + batches.add(batch); + batch = createBatchRequest(); + } + } + if (batch.size() > 0) { + batches.add(batch); + } + return batches; + } + + List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException { + List<BatchRequest> batches = new LinkedList<>(); + for (List<String> filesToDelete : + Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { + BatchRequest batch = createBatchRequest(); + for (String file : filesToDelete) { + enqueueDelete(GcsPath.fromUri(file), batch); + } + batches.add(batch); + } + return batches; + } + + public void remove(Collection<String> filenames) throws IOException { + executeBatches(makeRemoveBatches(filenames)); + } + + private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) + throws IOException { + final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; + + Storage.Objects.Get getRequest = storageClient.objects() + .get(path.getBucket(), path.getObject()); + getRequest.queue(batch, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { + ret[0] = StorageObjectOrIOException.create(response); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { + IOException ioException; + if (errorExtractor.itemNotFound(e)) { + ioException = new FileNotFoundException(path.toString()); + } else { + ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); + } + ret[0] = StorageObjectOrIOException.create(ioException); + } + }); + return ret; + } + + /** + * A class that holds either a {@link StorageObject} or an {@link IOException}. + */ + @AutoValue + public abstract static class StorageObjectOrIOException { + + /** + * Returns the {@link StorageObject}. + */ + @Nullable + public abstract StorageObject storageObject(); + + /** + * Returns the {@link IOException}. + */ + @Nullable + public abstract IOException ioException(); + + @VisibleForTesting + public static StorageObjectOrIOException create(StorageObject storageObject) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + checkNotNull(storageObject, "storageObject"), + null /* ioException */); + } + + @VisibleForTesting + public static StorageObjectOrIOException create(IOException ioException) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + null /* storageObject */, + checkNotNull(ioException, "ioException")); + } + } + + private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) + throws IOException { + Storage.Objects.Copy copyRequest = storageClient.objects() + .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); + copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully copied {} to {}", from, to); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + throw new IOException( + String.format("Error trying to copy %s to %s: %s", from, to, e)); + } + }); + } + + private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { + Storage.Objects.Delete deleteRequest = storageClient.objects() + .delete(file.getBucket(), file.getObject()); + deleteRequest.queue(batch, new JsonBatchCallback<Void>() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {}", file); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + throw new IOException(String.format("Error trying to delete %s: %s", file, e)); + } + }); + } + + private BatchRequest createBatchRequest() { + return storageClient.batch(httpRequestInitializer); + } + + private static int doubleSlashes(StringBuilder dst, char[] src, int i) { + // Emit the next character without special interpretation + dst.append('\\'); + if ((i - 1) != src.length) { + dst.append(src[i]); + i++; + } else { + // A backslash at the very end is treated like an escaped backslash + dst.append('\\'); + } + return i; + } +}
