Repository: jclouds Updated Branches: refs/heads/master 984b6ae8f -> 6bff97b6d
Changes the upload behavior to parallel, a TODO Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/6bff97b6 Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/6bff97b6 Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/6bff97b6 Branch: refs/heads/master Commit: 6bff97b6d3f05aaf7e24d303d917c478f533313d Parents: 984b6ae Author: Zack Shoylev <[email protected]> Authored: Mon Jun 20 15:36:47 2016 -0500 Committer: Zack Shoylev <[email protected]> Committed: Fri Jul 1 11:26:56 2016 -0500 ---------------------------------------------------------------------- .../blobstore/RegionScopedSwiftBlobStore.java | 63 +++++++++++++-- .../jclouds/blobstore/options/PutOptions.java | 82 +++++++++++++++++++- 2 files changed, 134 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jclouds/blob/6bff97b6/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java ---------------------------------------------------------------------- diff --git a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java index 20bdaef..63f4315 100644 --- a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java +++ b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java @@ -20,15 +20,19 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Iterables.tryFind; import static com.google.common.collect.Lists.transform; +import static org.jclouds.Constants.PROPERTY_USER_THREADS; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.location.predicates.LocationPredicates.idEquals; import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import javax.inject.Inject; +import javax.inject.Named; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; @@ -76,6 +80,7 @@ import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions; import org.jclouds.openstack.swift.v1.reference.SwiftHeaders; import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; @@ -89,10 +94,12 @@ import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; import com.google.common.net.HttpHeaders; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.AbstractModule; import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; @@ -579,10 +586,33 @@ public class RegionScopedSwiftBlobStore implements BlobStore { throw new UnsupportedOperationException(); } - // copied from BaseBlobStore + @com.google.inject.Inject + @Named(PROPERTY_USER_THREADS) + @VisibleForTesting + ListeningExecutorService userExecutor; + + /** + * Upload using a user-provided executor, or the jclouds userExecutor + * + * @param container + * @param blob + * @param overrides + * @return the multipart blob etag + */ @Beta protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) { - List<MultipartPart> parts = Lists.newArrayList(); + if (overrides.getUseCustomExecutor()) { + return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor()); + } else { + return putMultipartBlob(container, blob, overrides, userExecutor); + } + } + + // copied from BaseBlobStore + @Beta + protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) { + ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>(); + long contentLength = checkNotNull(blob.getMetadata().getContentMetadata().getContentLength(), "must provide content-length to use multi-part upload"); MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm( @@ -590,11 +620,30 @@ public class RegionScopedSwiftBlobStore implements BlobStore { long partSize = algorithm.calculateChunkSize(contentLength); MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides); int partNumber = 1; + for (Payload payload : slicer.slice(blob.getPayload(), partSize)) { - MultipartPart part = uploadMultipartPart(mpu, partNumber, payload); - parts.add(part); - ++partNumber; + BlobUploader b = + new BlobUploader(mpu, partNumber++, payload); + parts.add(executor.submit(b)); + } + + return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts))); + } + + private final class BlobUploader implements Callable<MultipartPart> { + private final MultipartUpload mpu; + private final int partNumber; + private final Payload payload; + + BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) { + this.mpu = mpu; + this.partNumber = partNumber; + this.payload = payload; + } + + @Override + public MultipartPart call() { + return uploadMultipartPart(mpu, partNumber, payload); } - return completeMultipartUpload(mpu, parts); } } http://git-wip-us.apache.org/repos/asf/jclouds/blob/6bff97b6/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java ---------------------------------------------------------------------- diff --git a/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java b/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java index 4cb8ca2..d80ab86 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java @@ -20,6 +20,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.jclouds.blobstore.domain.BlobAccess; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * Contains options supported in the put blob operation. <h2> * Usage</h2> The recommended way to instantiate a PutOptions object is to statically import @@ -36,6 +40,10 @@ public class PutOptions implements Cloneable { private BlobAccess blobAccess = BlobAccess.PRIVATE; private boolean multipart = false; + private boolean useCustomExecutor = false; + + // TODO: This exposes ListeningExecutorService to the user, instead of a regular ExecutorService + private ListeningExecutorService customExecutor = MoreExecutors.sameThreadExecutor(); public PutOptions() { } @@ -44,6 +52,25 @@ public class PutOptions implements Cloneable { this.multipart = multipart; } + /** + * Used for clone + * @param multipart + * @param customExecutor + */ + protected PutOptions(boolean multipart, boolean useCustomExecutor, ListeningExecutorService customExecutor) { + Preconditions.checkNotNull(customExecutor); + this.multipart = multipart; + this.useCustomExecutor = useCustomExecutor; + this.customExecutor = customExecutor; + } + + public PutOptions(ListeningExecutorService customExecutor) { + Preconditions.checkNotNull(customExecutor); + this.multipart = true; + this.useCustomExecutor = true; + this.customExecutor = customExecutor; + } + public static class ImmutablePutOptions extends PutOptions { private final PutOptions delegate; @@ -52,6 +79,16 @@ public class PutOptions implements Cloneable { } @Override + public ListeningExecutorService getCustomExecutor() { + return delegate.getCustomExecutor(); + } + + @Override + public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) { + throw new UnsupportedOperationException(); + } + + @Override public BlobAccess getBlobAccess() { return delegate.getBlobAccess(); } @@ -87,6 +124,22 @@ public class PutOptions implements Cloneable { return blobAccess; } + public boolean getUseCustomExecutor() { + return useCustomExecutor; + } + + public ListeningExecutorService getCustomExecutor() { + return customExecutor; + } + + public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) { + Preconditions.checkNotNull(customExecutor); + this.multipart = true; + this.useCustomExecutor = true; + this.customExecutor = customExecutor; + return this; + } + public PutOptions setBlobAccess(BlobAccess blobAccess) { this.blobAccess = checkNotNull(blobAccess); return this; @@ -98,7 +151,7 @@ public class PutOptions implements Cloneable { /** * split large blobs into pieces, if supported by the provider. - * + * * Equivalent to <code>multipart(true)</code> */ public PutOptions multipart() { @@ -113,12 +166,25 @@ public class PutOptions implements Cloneable { return this; } + /** + * Whether to split large blobs into pieces, if supported by the provider, using a custom executor + * + * @param customExecutor User-provided ListeningExecutorService + */ + public PutOptions multipart(ListeningExecutorService customExecutor) { + Preconditions.checkNotNull(customExecutor); + this.multipart = true; + this.useCustomExecutor = true; + this.customExecutor = customExecutor; + return this; + } + public static class Builder { public static PutOptions fromPutOptions(PutOptions putOptions) { return multipart(putOptions.multipart); } - + /** * @see PutOptions#multipart() */ @@ -130,15 +196,23 @@ public class PutOptions implements Cloneable { PutOptions options = new PutOptions(); return options.multipart(val); } + + public static PutOptions multipart(ListeningExecutorService customExecutor) { + PutOptions options = new PutOptions(); + return options.multipart(customExecutor); + } } @Override public PutOptions clone() { - return new PutOptions(multipart); + return new PutOptions(multipart, useCustomExecutor, customExecutor); } @Override public String toString() { - return "[multipart=" + multipart + ", blobAccess=" + blobAccess + "]"; + return "[multipart=" + multipart + + ", blobAccess=" + blobAccess + + ", useCustomExecutor=" + useCustomExecutor + + ", customExecutor=" + customExecutor + "]"; } }
