Repository: jclouds Updated Branches: refs/heads/master 4c9276366 -> 05c05e3de
Allows users to download large files efficiently and directly to disk. Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/05c05e3d Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/05c05e3d Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/05c05e3d Branch: refs/heads/master Commit: 05c05e3de25c1f6c215ce9ed39c6894b0aa62e4f Parents: 4c92763 Author: Zack Shoylev <[email protected]> Authored: Thu Jul 14 11:06:36 2016 -0500 Committer: Zack Shoylev <[email protected]> Committed: Fri Sep 2 14:05:26 2016 -0500 ---------------------------------------------------------------------- .../jclouds/atmos/blobstore/AtmosBlobStore.java | 12 +- .../blobstore/RegionScopedSwiftBlobStore.java | 243 ++++++++++++++++++- ...ionScopedSwiftBlobStoreParallelLiveTest.java | 196 +++++++++++++++ ...lesRegionScopedBlobStoreContextLiveTest.java | 1 + ...ionScopedSwiftBlobStoreParallelLiveTest.java | 49 ++++ .../org/jclouds/s3/blobstore/S3BlobStore.java | 18 +- .../java/org/jclouds/blobstore/BlobStore.java | 57 +++-- .../blobstore/config/LocalBlobStore.java | 23 ++ .../blobstore/internal/BaseBlobStore.java | 24 +- .../blobstore/util/ReadOnlyBlobStore.java | 23 ++ .../azureblob/blobstore/AzureBlobStore.java | 26 +- ...dFilesUSBlobIntegrationParallelLiveTest.java | 35 +++ 12 files changed, 656 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java ---------------------------------------------------------------------- diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java index 1a0e74b..a75fee2 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/AtmosBlobStore.java @@ -19,6 +19,7 @@ package org.jclouds.atmos.blobstore; import static com.google.common.base.Preconditions.checkNotNull; import static org.jclouds.atmos.options.PutOptions.Builder.publicRead; +import java.io.InputStream; import java.util.List; import java.util.Set; @@ -115,7 +116,7 @@ public class AtmosBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AtmosClient#createDirectory} - * + * * @param location * currently ignored * @param container @@ -148,7 +149,7 @@ public class AtmosBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AtmosClient#createDirectory} - * + * * @param container * directory name */ @@ -183,7 +184,7 @@ public class AtmosBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AtmosClient#pathExists} - * + * * @param container * container * @param key @@ -342,6 +343,11 @@ public class AtmosBlobStore extends BaseBlobStore { } @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException("Atmos does not support multipart uploads"); + } + + @Override public String copyBlob(String fromContainer, String fromName, String toContainer, String toName, CopyOptions options) { if (options.ifMatch() != null) { http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java ---------------------------------------------------------------------- diff --git a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java index 0b29f1f..b9630e1 100644 --- a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java +++ b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java @@ -25,15 +25,26 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi import static org.jclouds.location.predicates.LocationPredicates.idEquals; import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import javax.inject.Inject; import javax.inject.Named; +import org.jclouds.Constants; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.KeyNotFoundException; @@ -80,7 +91,6 @@ import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions; import org.jclouds.openstack.swift.v1.reference.SwiftHeaders; import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; @@ -96,10 +106,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; import com.google.common.net.HttpHeaders; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; @@ -109,7 +121,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore { @Inject protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext context, SwiftApi api, @Memoized Supplier<Set<? extends Location>> locations, @Assisted String regionId, - PayloadSlicer slicer) { + PayloadSlicer slicer, @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) { checkNotNull(regionId, "regionId"); Optional<? extends Location> found = tryFind(locations.get(), idEquals(regionId)); checkArgument(found.isPresent(), "region %s not in %s", regionId, locations.get()); @@ -119,6 +131,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore { this.toResourceMetadata = new ToResourceMetadata(found.get()); this.context = context; this.api = api; + this.userExecutor = userExecutor; // until we parameterize ClearListStrategy with a factory this.clearList = baseGraph.createChildInjector(new AbstractModule() { @Override @@ -137,6 +150,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore { private final ToListContainerOptions toListContainerOptions = new ToListContainerOptions(); private final ToResourceMetadata toResourceMetadata; protected final PayloadSlicer slicer; + protected final ListeningExecutorService userExecutor; @Override public Set<? extends Location> listAssignableLocations() { @@ -586,10 +600,9 @@ public class RegionScopedSwiftBlobStore implements BlobStore { throw new UnsupportedOperationException(); } - @com.google.inject.Inject - @Named(PROPERTY_USER_THREADS) - @VisibleForTesting - ListeningExecutorService userExecutor; + @com.google.inject.Inject(optional = true) + @Named(Constants.PROPERTY_MAX_RETRIES) + protected int retryCountLimit = 5; /** * Upload using a user-provided executor, or the jclouds userExecutor @@ -618,7 +631,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore { getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts()); long partSize = algorithm.calculateChunkSize(contentLength); MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides); - int partNumber = 1; + int partNumber = 0; for (Payload payload : slicer.slice(blob.getPayload(), partSize)) { BlobUploader b = @@ -645,4 +658,220 @@ public class RegionScopedSwiftBlobStore implements BlobStore { return uploadMultipartPart(mpu, partNumber, payload); } } + + @Override + @Beta + public void downloadBlob(String container, String name, File destination) { + downloadBlob(container, name, destination, userExecutor); + } + + @Override + @Beta + public void downloadBlob(String container, String name, File destination, ExecutorService executor) { + + ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); + RandomAccessFile raf = null; + try { + long contentLength = api + .getObjectApi(regionId, container) + .getWithoutBody(name) + .getPayload() + .getContentMetadata() + .getContentLength(); + + // Reserve space for performance reasons + raf = new RandomAccessFile(destination.getAbsoluteFile(), "rw"); + raf.seek(contentLength - 1); + raf.write(0); + + // Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated + long partSize = getMinimumMultipartPartSize(); + + // Loop through ranges within the file + long from; + long to; + List<ListenableFuture<Void>> results = new ArrayList<ListenableFuture<Void>>(); + + for (from = 0; from < contentLength; from = from + partSize) { + to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1; + BlobDownloader b = new BlobDownloader(regionId, container, name, raf, from, to); + results.add(listeningExecutor.submit(b)); + } + + Futures.getUnchecked(Futures.allAsList(results)); + + } catch (IOException e) { + // cleanup, attempt to delete large file + if (raf != null) { + try { + raf.close(); + } catch (IOException e1) {} + } + destination.delete(); + throw new RuntimeException(e); + } + } + + private final class BlobDownloader implements Callable<Void> { + String regionId; + String containerName; + String objectName; + private final RandomAccessFile raf; + private final long begin; + private final long end; + + BlobDownloader(String regionId, String containerName, String objectName, RandomAccessFile raf, long begin, long end) { + this.regionId = regionId; + this.containerName = containerName; + this.objectName = objectName; + this.raf = raf; + this.begin = begin; + this.end = end; + } + + @Override + public Void call() { + IOException lastException = null; + for (int retry = 0; retry < retryCountLimit; retry++) { + try { + SwiftObject object = api.getObjectApi(regionId, containerName) + .get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end)); + // Download first, this is the part that usually fails + byte[] targetArray = ByteStreams.toByteArray(object.getPayload().openStream()); + // Map file region + MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1); + out.put(targetArray); + out.force(); + } catch (IOException e) { + lastException = e; + continue; + } + // Success! + return null; + } + throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException); + } + } + + @Beta + @Override + public InputStream streamBlob(final String container, final String name) { + return streamBlob(container, name, userExecutor); + } + + @Beta + @Override + public InputStream streamBlob(final String container, final String name, final ExecutorService executor) { + + final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); + // User will receive the Input end of the piped stream + final PipedOutputStream output; + final PipedInputStream input; + try { + output = new PipedOutputStream(); + input = new PipedInputStream(output, + getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // The total length of the file to download is needed to determine ranges + // It has to be obtainable without downloading the whole file + final long contentLength = api + .getObjectApi(regionId, container) + .getWithoutBody(name) + .getPayload() + .getContentMetadata() + .getContentLength(); + + // Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated + final long partSize = getMinimumMultipartPartSize(); + + // Used to communicate between the producer and consumer threads + final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>(); + + listeningExecutor.submit(new Runnable() { + @Override + public void run() { + ListenableFuture<byte[]> result; + long from; + for (from = 0; from < contentLength; from = from + partSize) { + try { + System.out.println(Thread.currentThread() + " writing to output"); + result = results.take(); + if (result == null) { + output.close(); + input.close(); + throw new RuntimeException("Error downloading file part to stream"); + } + output.write(result.get()); + } catch (Exception e) { + System.out.println(e); + try { + // close pipe so client is notified of an exception + input.close(); + } catch (IOException e1) {} + try { + output.close(); + } catch (IOException e1) {} + throw new RuntimeException(e); + } + } + // Finished writing results to stream + try { + output.close(); + } catch (IOException e) { + } + } + }); + + listeningExecutor.submit(new Runnable() { + @Override + public void run() { + long from; + long to; + // Loop through ranges within the file + for (from = 0; from < contentLength; from = from + partSize) { + to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1; + BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to); + results.add(listeningExecutor.submit(b)); + } + } + }); + return input; + } + + private final class BlobStreamDownloader implements Callable<byte[]> { + String containerName; + String objectName; + private final long begin; + private final long end; + + BlobStreamDownloader(String containerName, String objectName, long begin, long end) { + this.containerName = containerName; + this.objectName = objectName; + this.begin = begin; + this.end = end; + } + + @Override + public byte[] call() { + IOException lastException = null; + for (int retry = 0; retry < retryCountLimit; retry++) { + try { + long time = System.nanoTime(); + SwiftObject object = api.getObjectApi(regionId, containerName) + .get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end)); + byte[] downloadedBlock = ByteStreams.toByteArray(object.getPayload().openStream()); + return downloadedBlock; + } catch (IOException e) { + System.out.println(e); + lastException = e; + continue; + } + } + throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException); + } + } } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/openstack-swift/src/test/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStoreParallelLiveTest.java ---------------------------------------------------------------------- diff --git a/apis/openstack-swift/src/test/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStoreParallelLiveTest.java b/apis/openstack-swift/src/test/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStoreParallelLiveTest.java new file mode 100644 index 0000000..e7a4fed --- /dev/null +++ b/apis/openstack-swift/src/test/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStoreParallelLiveTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.swift.v1.blobstore; + +import static org.assertj.core.util.Files.delete; +import static org.jclouds.blobstore.options.PutOptions.Builder.multipart; +import static org.jclouds.openstack.keystone.v2_0.config.KeystoneProperties.CREDENTIAL_TYPE; +import static org.testng.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.integration.internal.BaseBlobStoreIntegrationTest; +import org.jclouds.io.payloads.FilePayload; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.hash.HashCode; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +// TODO: Rolls tests up to BaseBlobStoreIntegrationTest +@Test(groups = "live", singleThreaded = true) +public class RegionScopedSwiftBlobStoreParallelLiveTest extends BaseBlobStoreIntegrationTest { + + private final File BIG_FILE = new File("random.dat"); + private final long SIZE = 1000000000; //10 * 1000 * 1000; + private BlobStore blobStore; + private String ETAG; + private ListeningExecutorService executor = + MoreExecutors.listeningDecorator( + MoreExecutors.getExitingExecutorService( + new ThreadPoolExecutor(5, 5, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(10, true), new ThreadPoolExecutor.CallerRunsPolicy()))); + + private String CONTAINER = "jcloudsparalleltest" + UUID.randomUUID(); + + public RegionScopedSwiftBlobStoreParallelLiveTest() { + provider = "openstack-swift"; + } + + // Override as needed for the right region + protected BlobStore getBlobStore() { + RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view); + return ctx.getBlobStore("US-TX"); + } + + @Override + protected Properties setupProperties() { + Properties props = super.setupProperties(); + setIfTestSystemPropertyPresent(props, CREDENTIAL_TYPE); + return props; + } + + @BeforeClass + public void setup() throws IOException, InterruptedException { + blobStore = getBlobStore(); + createRandomFile(SIZE, BIG_FILE); + HashCode hashCode = Files.hash(BIG_FILE, Hashing.md5()); + ETAG = hashCode.toString(); + blobStore.createContainerInLocation(null, CONTAINER); + System.out.println("generated file md5: " + ETAG); + } + + @AfterClass + public void cleanupFiles() { + // Delete local file + delete(BIG_FILE); + + // Delete uploaded file + blobStore.clearContainer(CONTAINER); + blobStore.deleteContainer(CONTAINER); + } + + @Test + public void uploadMultipartBlob() { + Blob blob = blobStore.blobBuilder(BIG_FILE.getName()) + .payload(new FilePayload(BIG_FILE)) + .build(); + // configure the blobstore to use multipart uploading of the file + String eTag = blobStore.putBlob(CONTAINER, blob, multipart(executor)); + // assertEquals(eTag, ETAG); + // The etag returned by Swift is not the md5 of the Blob uploaded + // It is the md5 of the concatenated segment md5s + } + + @Test(dependsOnMethods = "uploadMultipartBlob", singleThreaded = true) + public void downloadParallelBlob() throws IOException { + final File downloadedFile = new File(BIG_FILE.getName() + ".downloaded"); + blobStore.downloadBlob(CONTAINER, BIG_FILE.getName(), downloadedFile, executor); + String eTag = Files.hash(downloadedFile, Hashing.md5()).toString(); + assertEquals(eTag, ETAG); + } + + @Test(dependsOnMethods = "uploadMultipartBlob", singleThreaded = true) + public void streamParallelBlob() throws IOException { + InputStream is = blobStore.streamBlob(CONTAINER, BIG_FILE.getName(), executor); + byte[] segment = new byte[1000000]; + + Hasher hasher = Hashing.md5().newHasher(); + + int read; + while ( (read = is.read(segment)) > 0) { + System.out.println("Read " + read + " bytes from input stream."); + hasher.putBytes(segment, 0, read); + } + + is.close(); + assertEquals(hasher.hash().toString(), ETAG); + } + + private void createRandomFile(long size, File file) throws IOException, InterruptedException { + RandomAccessFile raf = null; + + // Reserve space for performance reasons + raf = new RandomAccessFile(file.getAbsoluteFile(), "rw"); + raf.seek(size - 1); + raf.write(0); + + // Loop through ranges within the file + long from; + long to; + long partSize = 1000000; + + ExecutorService threadPool = Executors.newFixedThreadPool(16); + + for (from = 0; from < size; from = from + partSize) { + to = (from + partSize >= size) ? size - 1 : from + partSize - 1; + RandomFileWriter writer = new RandomFileWriter(raf, from, to); + threadPool.submit(writer); + } + + threadPool.shutdown(); + threadPool.awaitTermination(1, TimeUnit.DAYS); + } + + private final class RandomFileWriter implements Runnable { + private final RandomAccessFile raf; + private final long begin; + private final long end; + + RandomFileWriter(RandomAccessFile raf, long begin, long end) { + this.raf = raf; + this.begin = begin; + this.end = end; + } + + @Override + public void run() { + try { + byte[] targetArray = new byte[(int) (end - begin + 1)]; + Random random = new Random(); + random.nextBytes(targetArray); + // Map file region + MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1); + out.put(targetArray); + out.force(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedBlobStoreContextLiveTest.java ---------------------------------------------------------------------- diff --git a/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedBlobStoreContextLiveTest.java b/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedBlobStoreContextLiveTest.java index 89d1499..83d61ca 100644 --- a/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedBlobStoreContextLiveTest.java +++ b/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedBlobStoreContextLiveTest.java @@ -20,6 +20,7 @@ import static org.jclouds.rackspace.cloudidentity.v2_0.config.CloudIdentityCrede import java.util.Properties; +import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext; import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContextLiveTest; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest.java ---------------------------------------------------------------------- diff --git a/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest.java b/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest.java new file mode 100644 index 0000000..96017ca --- /dev/null +++ b/apis/rackspace-cloudfiles/src/test/java/org/jclouds/rackspace/cloudfiles/v1/blobstore/CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jclouds.rackspace.cloudfiles.v1.blobstore; + +import static org.jclouds.rackspace.cloudidentity.v2_0.config.CloudIdentityCredentialTypes.API_KEY_CREDENTIALS; + +import java.util.Properties; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext; +import org.jclouds.openstack.swift.v1.blobstore.RegionScopedSwiftBlobStoreParallelLiveTest; +import org.testng.annotations.Test; + +//Applies the RegionScopedSwiftBlobStoreIntegrationTest to rackspace +@Test(groups = "live") +public class CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest extends RegionScopedSwiftBlobStoreParallelLiveTest { + + public CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest() { + provider = "rackspace-cloudfiles"; + } + + @Override + protected BlobStore getBlobStore() { + RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view); + return ctx.getBlobStore("IAD"); + } + + @Override + protected Properties setupProperties() { + Properties props = super.setupProperties(); + setIfTestSystemPropertyPresent(props, API_KEY_CREDENTIALS); + return props; + } +} http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java ---------------------------------------------------------------------- diff --git a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java index e5d56b9..4895791 100644 --- a/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java +++ b/apis/s3/src/main/java/org/jclouds/s3/blobstore/S3BlobStore.java @@ -122,7 +122,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#bucketExists} - * + * * @param container * bucket name */ @@ -133,7 +133,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#putBucketInRegion} - * + * * @param location * corresponds to a Region * @param container @@ -165,7 +165,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#listBucket} - * + * * @param container * bucket name */ @@ -195,7 +195,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#objectExists} - * + * * @param container * bucket name * @param key @@ -208,7 +208,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#headObject} - * + * * @param container * bucket name * @param key @@ -221,7 +221,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#getObject} - * + * * @param container * bucket name * @param key @@ -235,7 +235,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#putObject} - * + * * @param container * bucket name * @param blob @@ -248,7 +248,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#putObject} - * + * * @param container * bucket name * @param blob @@ -322,7 +322,7 @@ public class S3BlobStore extends BaseBlobStore { /** * This implementation invokes {@link S3Client#deleteObject} - * + * * @param container * bucket name * @param key http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java index 933675c..0e34306 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/BlobStore.java @@ -16,10 +16,11 @@ */ package org.jclouds.blobstore; +import java.io.File; +import java.io.InputStream; import java.util.List; import java.util.Set; - -import com.google.common.annotations.Beta; +import java.util.concurrent.ExecutorService; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobAccess; @@ -39,6 +40,8 @@ import org.jclouds.domain.Location; import org.jclouds.io.Payload; import org.jclouds.javax.annotation.Nullable; +import com.google.common.annotations.Beta; + /** * Synchronous access to a BlobStore such as Amazon S3 */ @@ -49,7 +52,7 @@ public interface BlobStore { BlobStoreContext getContext(); /** - * + * * @return builder for creating new {@link Blob}s */ BlobBuilder blobBuilder(String name); @@ -75,14 +78,14 @@ public interface BlobStore { /** * Creates a namespace for your blobs * <p/> - * + * * A container is a namespace for your objects. Depending on the service, the scope can be * global, identity, or sub-identity scoped. For example, in Amazon S3, containers are called * buckets, and they must be uniquely named such that no-one else in the world conflicts. In * other blobstores, the naming convention of the container is less strict. All blobstores allow * you to list your containers and also the contents within them. These contents can either be * blobs, folders, or virtual paths. - * + * * @param location * some blobstores allow you to specify a location, such as US-EAST, for where this * container will exist. null will choose a default location @@ -93,7 +96,7 @@ public interface BlobStore { boolean createContainerInLocation(@Nullable Location location, String container); /** - * + * * @param options * controls default access control * @see #createContainerInLocation(Location,String) @@ -108,7 +111,7 @@ public interface BlobStore { /** * Lists all resources in a container non-recursive. - * + * * @param container * what to list * @return a list that may be incomplete, depending on whether PageSet#getNextMarker is set @@ -118,7 +121,7 @@ public interface BlobStore { /** * Like {@link #list(String)} except you can control the size, recursion, and context of the list * using {@link ListContainerOptions options} - * + * * @param container * what to list * @param options @@ -129,7 +132,7 @@ public interface BlobStore { /** * This will delete the contents of a container at its root path without deleting the container - * + * * @param container * what to clear */ @@ -138,7 +141,7 @@ public interface BlobStore { /** * Like {@link #clearContainer(String)} except you can use options to do things like recursive * deletes, or clear at a different path than root. - * + * * @param container * what to clear * @param options @@ -148,7 +151,7 @@ public interface BlobStore { /** * This will delete everything inside a container recursively. - * + * * @param container * what to delete * @param container name of the container to delete @@ -165,7 +168,7 @@ public interface BlobStore { /** * Determines if a directory exists - * + * * @param container * container where the directory resides * @param directory @@ -175,7 +178,7 @@ public interface BlobStore { /** * Creates a folder or a directory marker depending on the service - * + * * @param container * container to create the directory in * @param directory @@ -185,7 +188,7 @@ public interface BlobStore { /** * Deletes a folder or a directory marker depending on the service - * + * * @param container * container to delete the directory from * @param directory @@ -195,7 +198,7 @@ public interface BlobStore { /** * Determines if a blob exists - * + * * @param container * container where the blob resides * @param directory @@ -205,7 +208,7 @@ public interface BlobStore { /** * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} - * + * * @param container * container to place the blob. * @param blob @@ -221,7 +224,7 @@ public interface BlobStore { /** * Adds a {@code Blob} representing the data at location {@code container/blob.metadata.name} * options using multipart strategies. - * + * * @param container * container to place the blob. * @param blob @@ -249,7 +252,7 @@ public interface BlobStore { /** * Retrieves the metadata of a {@code Blob} at location {@code container/name} - * + * * @param container * container where this exists. * @param name @@ -263,7 +266,7 @@ public interface BlobStore { /** * Retrieves a {@code Blob} representing the data at location {@code container/name} - * + * * @param container * container where this exists. * @param name @@ -277,7 +280,7 @@ public interface BlobStore { /** * Retrieves a {@code Blob} representing the data at location {@code container/name} - * + * * @param container * container where this exists. * @param name @@ -293,7 +296,7 @@ public interface BlobStore { /** * Deletes a {@code Blob} representing the data at location {@code container/name} - * + * * @param container * container where this exists. * @param name @@ -359,4 +362,16 @@ public interface BlobStore { @Beta int getMaximumNumberOfParts(); + + @Beta + void downloadBlob(String container, String name, File destination); + + @Beta + void downloadBlob(String container, String name, File destination, ExecutorService executor); + + @Beta + InputStream streamBlob(String container, String name); + + @Beta + InputStream streamBlob(String container, String name, ExecutorService executor); } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java ---------------------------------------------------------------------- diff --git a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java index 631f182..56b91ff 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.UUID; +import java.util.concurrent.ExecutorService; import javax.annotation.Resource; import javax.inject.Inject; @@ -942,10 +943,32 @@ public final class LocalBlobStore implements BlobStore { return Integer.MAX_VALUE; } + @Override + public void downloadBlob(String container, String name, File destination) { + throw new UnsupportedOperationException(); + } + + @Override + public void downloadBlob(String container, String name, File destination, ExecutorService executor) { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream streamBlob(String container, String name, ExecutorService executor) { + throw new UnsupportedOperationException(); + } + private static String maybeQuoteETag(String eTag) { if (!eTag.startsWith("\"") && !eTag.endsWith("\"")) { eTag = "\"" + eTag + "\""; } return eTag; } + + } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java ---------------------------------------------------------------------- diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java index 21ae2ff..4e050c8 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java @@ -22,13 +22,15 @@ import static org.jclouds.Constants.PROPERTY_USER_THREADS; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.util.Predicates2.retry; -import java.io.InputStream; +import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Date; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.inject.Named; @@ -400,4 +402,24 @@ public abstract class BaseBlobStore implements BlobStore { } return eTag; } + + @Override + public void downloadBlob(String container, String name, File destination) { + throw new UnsupportedOperationException("Operation not supported yet"); + } + + @Override + public void downloadBlob(String container, String name, File destination, ExecutorService executor) { + throw new UnsupportedOperationException("Operation not supported yet"); + } + + @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException("Operation not supported yet"); + } + + @Override + public InputStream streamBlob(String container, String name, ExecutorService executor) { + throw new UnsupportedOperationException("Operation not supported yet"); + } } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/blobstore/src/main/java/org/jclouds/blobstore/util/ReadOnlyBlobStore.java ---------------------------------------------------------------------- diff --git a/blobstore/src/main/java/org/jclouds/blobstore/util/ReadOnlyBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/util/ReadOnlyBlobStore.java index af82dab..864db8a 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/util/ReadOnlyBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/util/ReadOnlyBlobStore.java @@ -17,7 +17,10 @@ package org.jclouds.blobstore.util; +import java.io.File; +import java.io.InputStream; import java.util.List; +import java.util.concurrent.ExecutorService; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; @@ -165,4 +168,24 @@ public final class ReadOnlyBlobStore extends ForwardingBlobStore { public List<MultipartUpload> listMultipartUploads(String container) { throw new UnsupportedOperationException("Read-only BlobStore"); } + + @Override + public void downloadBlob(String container, String name, File destination) { + throw new UnsupportedOperationException(); + } + + @Override + public void downloadBlob(String container, String name, File destination, ExecutorService executor) { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream streamBlob(String container, String name, ExecutorService executor) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java ---------------------------------------------------------------------- diff --git a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java index c7007e5..fe7beff 100644 --- a/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java +++ b/providers/azureblob/src/main/java/org/jclouds/azureblob/blobstore/AzureBlobStore.java @@ -19,6 +19,7 @@ package org.jclouds.azureblob.blobstore; import static com.google.common.base.Preconditions.checkNotNull; import static org.jclouds.azure.storage.options.ListOptions.Builder.includeMetadata; +import java.io.InputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; @@ -130,7 +131,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#bucketExists} - * + * * @param container * container name */ @@ -141,7 +142,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#putBucketInRegion} - * + * * @param location * currently ignored * @param container @@ -154,7 +155,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#listBlobs} - * + * * @param container * container name */ @@ -166,7 +167,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#deleteContainer} - * + * * @param container * container name */ @@ -177,7 +178,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#blobExists} - * + * * @param container * container name * @param key @@ -190,7 +191,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#getBlob} - * + * * @param container * container name * @param key @@ -205,7 +206,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#putObject} - * + * * @param container * container name * @param blob @@ -218,7 +219,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#putObject} - * + * * @param container * container name * @param blob @@ -298,7 +299,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#deleteObject} - * + * * @param container * container name * @param key @@ -336,7 +337,7 @@ public class AzureBlobStore extends BaseBlobStore { /** * This implementation invokes {@link AzureBlobClient#getBlobProperties} - * + * * @param container * container name * @param key @@ -498,4 +499,9 @@ public class AzureBlobStore extends BaseBlobStore { public int getMaximumNumberOfParts() { return 50 * 1000; } + + @Override + public InputStream streamBlob(String container, String name) { + throw new UnsupportedOperationException("Azure does not support streaming a blob"); + } } http://git-wip-us.apache.org/repos/asf/jclouds/blob/05c05e3d/providers/rackspace-cloudfiles-us/src/test/java/org/jclouds/rackspace/cloudfiles/us/blobstore/integration/CloudFilesUSBlobIntegrationParallelLiveTest.java ---------------------------------------------------------------------- diff --git a/providers/rackspace-cloudfiles-us/src/test/java/org/jclouds/rackspace/cloudfiles/us/blobstore/integration/CloudFilesUSBlobIntegrationParallelLiveTest.java b/providers/rackspace-cloudfiles-us/src/test/java/org/jclouds/rackspace/cloudfiles/us/blobstore/integration/CloudFilesUSBlobIntegrationParallelLiveTest.java new file mode 100644 index 0000000..e9292ee --- /dev/null +++ b/providers/rackspace-cloudfiles-us/src/test/java/org/jclouds/rackspace/cloudfiles/us/blobstore/integration/CloudFilesUSBlobIntegrationParallelLiveTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.rackspace.cloudfiles.us.blobstore.integration; + +import org.jclouds.blobstore.BlobStore; +import org.jclouds.openstack.swift.v1.blobstore.RegionScopedBlobStoreContext; +import org.jclouds.rackspace.cloudfiles.v1.blobstore.CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest; +import org.testng.annotations.Test; + +@Test(groups = "live", testName = "CloudFilesUSBlobIntegrationLiveTest") +public class CloudFilesUSBlobIntegrationParallelLiveTest extends CloudFilesRegionScopedSwiftBlobStoreParallelLiveTest { + public CloudFilesUSBlobIntegrationParallelLiveTest() { + provider = "rackspace-cloudfiles-us"; + } + + @Override + protected BlobStore getBlobStore() { + RegionScopedBlobStoreContext ctx = RegionScopedBlobStoreContext.class.cast(view); + return ctx.getBlobStore("DFW"); + } +}
