Repository: incubator-beam Updated Branches: refs/heads/master 51e1e59b8 -> dc4f2f706
Move copy and remove from FileBasedSink to GcsUtil. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c88bfff6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c88bfff6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c88bfff6 Branch: refs/heads/master Commit: c88bfff6fc56cea57e447d841f54ed4aa7e03d3b Parents: 51e1e59 Author: Pei He <pe...@google.com> Authored: Thu May 5 18:29:00 2016 -0700 Committer: Pei He <pe...@google.com> Committed: Thu May 5 18:29:00 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 181 +------------------ .../java/org/apache/beam/sdk/util/GcsUtil.java | 166 ++++++++++++++++- 2 files changed, 172 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 10e93f5..7d23e7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -19,26 +19,16 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.Transport; -import org.apache.beam.sdk.util.gcsfs.GcsPath; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.batch.json.JsonBatchCallback; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.StorageRequest; -import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.hadoop.util.ApiErrorExtractor; + import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -54,11 +44,8 @@ import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.LinkedList; import java.util.List; -import javax.annotation.concurrent.NotThreadSafe; - /** * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based * output and defines the format of output files (how values are written, headers/footers, MIME @@ -611,80 +598,20 @@ public abstract class FileBasedSink<T> extends Sink<T> { * GCS file system operations. */ private static class GcsOperations implements FileOperations { - private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class); - - /** - * Maximum number of requests permitted in a GCS batch request. - */ - private static final int MAX_REQUESTS_PER_BATCH = 1000; - - private ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - private GcsOptions gcsOptions; - private Storage gcs; - private BatchHelper batchHelper; + private final GcsUtil gcsUtil; public GcsOperations(PipelineOptions options) { - gcsOptions = options.as(GcsOptions.class); - gcs = Transport.newStorageClient(gcsOptions).build(); - batchHelper = - new BatchHelper(gcs.getRequestFactory().getInitializer(), gcs, MAX_REQUESTS_PER_BATCH); + gcsUtil = new GcsUtilFactory().create(options); } @Override public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { - Preconditions.checkArgument( - srcFilenames.size() == destFilenames.size(), - String.format("Number of source files {} must equal number of destination files {}", - srcFilenames.size(), destFilenames.size())); - for (int i = 0; i < srcFilenames.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); - final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); - LOG.debug("Copying {} to {}", sourcePath, destPath); - Storage.Objects.Copy copyObject = gcs.objects().copy(sourcePath.getBucket(), - sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null); - batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() { - @Override - public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully copied {} to {}", sourcePath, destPath); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", sourcePath); - } - }); - } - batchHelper.flush(); + gcsUtil.copy(srcFilenames, destFilenames); } @Override public void remove(Collection<String> filenames) throws IOException { - for (String filename : filenames) { - final GcsPath path = GcsPath.fromUri(filename); - LOG.debug("Removing: " + path); - Storage.Objects.Delete deleteObject = - gcs.objects().delete(path.getBucket(), path.getObject()); - batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException { - LOG.debug("Successfully removed {}", path); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", path); - } - }); - } - batchHelper.flush(); + gcsUtil.remove(filenames); } } @@ -735,98 +662,4 @@ public abstract class FileBasedSink<T> extends Sink<T> { } } } - - /** - * BatchHelper abstracts out the logic for the maximum requests per batch for GCS. - * - * <p>Copy of - * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java - * - * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not - * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage - * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project - * and others may use the connector without introducing unnecessary dependencies. - * - * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical - * grouping of requests. - */ - @NotThreadSafe - private static class BatchHelper { - /** - * Callback that causes a single StorageRequest to be added to the BatchRequest. - */ - protected static interface QueueRequestCallback { - void enqueue() throws IOException; - } - - private final List<QueueRequestCallback> pendingBatchEntries; - private final BatchRequest batch; - - // Number of requests that can be queued into a single actual HTTP request - // before a sub-batch is sent. - private final long maxRequestsPerBatch; - - // Flag that indicates whether there is an in-progress flush. - private boolean flushing = false; - - /** - * Primary constructor, generally accessed only via the inner Factory class. - */ - public BatchHelper( - HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) { - this.pendingBatchEntries = new LinkedList<>(); - this.batch = gcs.batch(requestInitializer); - this.maxRequestsPerBatch = maxRequestsPerBatch; - } - - /** - * Adds an additional request to the batch, and possibly flushes the current contents of the - * batch if {@code maxRequestsPerBatch} has been reached. - */ - public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback) - throws IOException { - QueueRequestCallback queueCallback = new QueueRequestCallback() { - @Override - public void enqueue() throws IOException { - req.queue(batch, callback); - } - }; - pendingBatchEntries.add(queueCallback); - - flushIfPossibleAndRequired(); - } - - // Flush our buffer if we have more pending entries than maxRequestsPerBatch - private void flushIfPossibleAndRequired() throws IOException { - if (pendingBatchEntries.size() > maxRequestsPerBatch) { - flushIfPossible(); - } - } - - // Flush our buffer if we are not already in a flush operation and we have data to flush. - private void flushIfPossible() throws IOException { - if (!flushing && pendingBatchEntries.size() > 0) { - flushing = true; - try { - while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) { - QueueRequestCallback head = pendingBatchEntries.remove(0); - head.enqueue(); - } - - batch.execute(); - } finally { - flushing = false; - } - } - } - - - /** - * Sends any currently remaining requests in the batch; should be called at the end of any - * series of batched requests to ensure everything has been sent. - */ - public void flush() throws IOException { - flushIfPossible(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c88bfff6/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index f7ef447..aa8774b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -22,10 +22,16 @@ import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.batch.json.JsonBatchCallback; +import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; +import com.google.api.services.storage.StorageRequest; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; @@ -47,6 +53,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -55,6 +62,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; /** * Provides operations on GCS. @@ -76,7 +84,6 @@ public class GcsUtil { public GcsUtil create(PipelineOptions options) { LOG.debug("Creating new GcsUtil"); GcsOptions gcsOptions = options.as(GcsOptions.class); - return new GcsUtil(Transport.newStorageClient(gcsOptions).build(), gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } @@ -98,6 +105,11 @@ public class GcsUtil { private static final Pattern RECURSIVE_GCS_PATTERN = Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); + /** + * Maximum number of requests permitted in a GCS batch request. + */ + private static final int MAX_REQUESTS_PER_BATCH = 1000; + ///////////////////////////////////////////////////////////////////////////// /** Client for the GCS API. */ @@ -111,6 +123,7 @@ public class GcsUtil { // Exposed for testing. final ExecutorService executorService; + private final BatchHelper batchHelper; /** * Returns true if the given GCS pattern is supported otherwise fails with an * exception. @@ -130,6 +143,8 @@ public class GcsUtil { this.storageClient = storageClient; this.uploadBufferSizeBytes = uploadBufferSizeBytes; this.executorService = executorService; + this.batchHelper = new BatchHelper( + storageClient.getRequestFactory().getInitializer(), storageClient, MAX_REQUESTS_PER_BATCH); } // Use this only for testing purposes. @@ -355,6 +370,155 @@ public class GcsUtil { } } + public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + Preconditions.checkArgument( + srcFilenames.size() == destFilenames.size(), + String.format("Number of source files {} must equal number of destination files {}", + srcFilenames.size(), destFilenames.size())); + for (int i = 0; i < srcFilenames.size(); i++) { + final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); + final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); + LOG.debug("Copying {} to {}", sourcePath, destPath); + Storage.Objects.Copy copyObject = storageClient.objects().copy(sourcePath.getBucket(), + sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null); + batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully copied {} to {}", sourcePath, destPath); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + // Do nothing on item not found. + if (!errorExtractor.itemNotFound(e)) { + throw new IOException(e.toString()); + } + LOG.debug("{} does not exist.", sourcePath); + } + }); + } + batchHelper.flush(); + } + + public void remove(Collection<String> filenames) throws IOException { + for (String filename : filenames) { + final GcsPath path = GcsPath.fromUri(filename); + LOG.debug("Removing: " + path); + Storage.Objects.Delete deleteObject = + storageClient.objects().delete(path.getBucket(), path.getObject()); + batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException { + LOG.debug("Successfully removed {}", path); + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + // Do nothing on item not found. + if (!errorExtractor.itemNotFound(e)) { + throw new IOException(e.toString()); + } + LOG.debug("{} does not exist.", path); + } + }); + } + batchHelper.flush(); + } + + /** + * BatchHelper abstracts out the logic for the maximum requests per batch for GCS. + * + * <p>Copy of + * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java + * + * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not + * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage + * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project + * and others may use the connector without introducing unnecessary dependencies. + * + * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical + * grouping of requests. + */ + @NotThreadSafe + private static class BatchHelper { + /** + * Callback that causes a single StorageRequest to be added to the BatchRequest. + */ + protected static interface QueueRequestCallback { + void enqueue() throws IOException; + } + + private final List<QueueRequestCallback> pendingBatchEntries; + private final BatchRequest batch; + + // Number of requests that can be queued into a single actual HTTP request + // before a sub-batch is sent. + private final long maxRequestsPerBatch; + + // Flag that indicates whether there is an in-progress flush. + private boolean flushing = false; + + /** + * Primary constructor, generally accessed only via the inner Factory class. + */ + public BatchHelper( + HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) { + this.pendingBatchEntries = new LinkedList<>(); + this.batch = gcs.batch(requestInitializer); + this.maxRequestsPerBatch = maxRequestsPerBatch; + } + + /** + * Adds an additional request to the batch, and possibly flushes the current contents of the + * batch if {@code maxRequestsPerBatch} has been reached. + */ + public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback) + throws IOException { + QueueRequestCallback queueCallback = new QueueRequestCallback() { + @Override + public void enqueue() throws IOException { + req.queue(batch, callback); + } + }; + pendingBatchEntries.add(queueCallback); + + flushIfPossibleAndRequired(); + } + + // Flush our buffer if we have more pending entries than maxRequestsPerBatch + private void flushIfPossibleAndRequired() throws IOException { + if (pendingBatchEntries.size() > maxRequestsPerBatch) { + flushIfPossible(); + } + } + + // Flush our buffer if we are not already in a flush operation and we have data to flush. + private void flushIfPossible() throws IOException { + if (!flushing && pendingBatchEntries.size() > 0) { + flushing = true; + try { + while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) { + QueueRequestCallback head = pendingBatchEntries.remove(0); + head.enqueue(); + } + + batch.execute(); + } finally { + flushing = false; + } + } + } + + + /** + * Sends any currently remaining requests in the batch; should be called at the end of any + * series of batched requests to ensure everything has been sent. + */ + public void flush() throws IOException { + flushIfPossible(); + } + } + /** * Expands glob expressions to regular expressions. *