This is an automated email from the ASF dual-hosted git repository.
gaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/jclouds.git
The following commit(s) were added to refs/heads/master by this push:
new 13f32b2 Lazily open InputStream during complete MPU
13f32b2 is described below
commit 13f32b28c90f4dda7ea21c95380d8c0879ba91fe
Author: Andrew Gaul <[email protected]>
AuthorDate: Mon Jan 28 21:45:20 2019 -0800
Lazily open InputStream during complete MPU
Previously the filesystem provider could exhaust file descriptors by
eagerly opening up to 10,000 parts. This partially undoes
JCLOUDS-1367.
---
.../integration/FilesystemBlobIntegrationTest.java | 28 ++++++++++
.../jclouds/blobstore/config/LocalBlobStore.java | 59 ++++++++++++++++++----
2 files changed, 77 insertions(+), 10 deletions(-)
diff --git
a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
index cda138c..ff23e3f 100644
---
a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
+++
b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
@@ -164,6 +164,34 @@ public class FilesystemBlobIntegrationTest extends
BaseBlobIntegrationTest {
}
}
+ @Test(groups = { "integration", "live" })
+ public void test10000PartMultipartUpload() throws Exception {
+ BlobStore blobStore = view.getBlobStore();
+ String container = getContainerName();
+ int partSize = (int) blobStore.getMinimumMultipartPartSize();
+ try {
+ String name = "blob-name";
+ BlobBuilder blobBuilder = blobStore.blobBuilder(name);
+ Blob blob = blobBuilder.build();
+ MultipartUpload mpu = blobStore.initiateMultipartUpload(container,
blob.getMetadata(), new PutOptions());
+ ImmutableList.Builder<MultipartPart> parts = ImmutableList.builder();
+ byte[] content = new byte[partSize];
+
+ for (int i = 0; i < 10 * 1000; ++i) {
+ Payload payload = Payloads.newByteArrayPayload(content);
+ payload.getContentMetadata().setContentLength((long) partSize);
+ parts.add(blobStore.uploadMultipartPart(mpu, i, payload));
+ }
+
+ blobStore.completeMultipartUpload(mpu, parts.build());
+
+ BlobMetadata newBlobMetadata = blobStore.blobMetadata(container,
name);
+ assertThat(newBlobMetadata.getSize()).isEqualTo(10 * 1000 * partSize);
+ } finally {
+ returnContainer(container);
+ }
+ }
+
protected void checkExtendedAttributesSupport() {
if (isMacOSX()) {
throw new SkipException("filesystem does not support extended
attributes in Mac OSX");
diff --git
a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
index b62ad18..81c6ae3 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
@@ -28,12 +28,14 @@ import static com.google.common.collect.Sets.newTreeSet;
import static
org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -756,7 +758,14 @@ public final class LocalBlobStore implements BlobStore {
// return InputStream to more closely follow real blobstore
Payload payload;
try {
- payload = new InputStreamPayload(blob.getPayload().openStream());
+ InputStream is = blob.getPayload().openStream();
+ if (is instanceof FileInputStream) {
+ // except for FileInputStream since large MPU can open too many fds
+ is.close();
+ payload = blob.getPayload();
+ } else {
+ payload = new InputStreamPayload(blob.getPayload().openStream());
+ }
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
@@ -825,20 +834,14 @@ public final class LocalBlobStore implements BlobStore {
@Override
public String completeMultipartUpload(MultipartUpload mpu,
List<MultipartPart> parts) {
- ImmutableList.Builder<InputStream> streams = ImmutableList.builder();
+ ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
long contentLength = 0;
Hasher md5Hasher = Hashing.md5().newHasher();
for (MultipartPart part : parts) {
Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX +
mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
contentLength +=
blobPart.getMetadata().getContentMetadata().getContentLength();
- InputStream is;
- try {
- is = blobPart.getPayload().openStream();
- } catch (IOException ioe) {
- throw propagate(ioe);
- }
- streams.add(is);
+ blobs.add(blobPart);
md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
}
String mpuETag = new StringBuilder("\"")
@@ -849,7 +852,7 @@ public final class LocalBlobStore implements BlobStore {
.toString();
PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
.userMetadata(mpu.blobMetadata().getUserMetadata())
- .payload(new
SequenceInputStream(Iterators.asEnumeration(streams.build().iterator())))
+ .payload(new MultiBlobInputStream(blobs.build()))
.contentLength(contentLength)
.eTag(mpuETag);
String cacheControl =
mpu.blobMetadata().getContentMetadata().getCacheControl();
@@ -998,5 +1001,41 @@ public final class LocalBlobStore implements BlobStore {
return eTag;
}
+ private static final class MultiBlobInputStream extends InputStream {
+ private final Iterator<Blob> blobs;
+ private InputStream current;
+
+ MultiBlobInputStream(List<Blob> blobs) {
+ this.blobs = blobs.iterator();
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int result = read(b, 0, b.length);
+ if (result == -1) {
+ return -1;
+ }
+ return b[0] & 0x000000FF;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ while (true) {
+ if (current == null) {
+ if (!blobs.hasNext()) {
+ return -1;
+ }
+ current = blobs.next().getPayload().openStream();
+ }
+ int result = current.read(b, off, len);
+ if (result == -1) {
+ current.close();
+ current = null;
+ continue;
+ }
+ return result;
+ }
+ }
+ }
}