This is an automated email from the ASF dual-hosted git repository.

gaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/jclouds.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f32b2  Lazily open InputStream during complete MPU
13f32b2 is described below

commit 13f32b28c90f4dda7ea21c95380d8c0879ba91fe
Author: Andrew Gaul <[email protected]>
AuthorDate: Mon Jan 28 21:45:20 2019 -0800

    Lazily open InputStream during complete MPU
    
    Previously the filesystem provider could exhaust file descriptors by
    eagerly opening up to 10,000 parts.  This partially undoes
    JCLOUDS-1367.
---
 .../integration/FilesystemBlobIntegrationTest.java | 28 ++++++++++
 .../jclouds/blobstore/config/LocalBlobStore.java   | 59 ++++++++++++++++++----
 2 files changed, 77 insertions(+), 10 deletions(-)

diff --git 
a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
 
b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
index cda138c..ff23e3f 100644
--- 
a/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
+++ 
b/apis/filesystem/src/test/java/org/jclouds/filesystem/integration/FilesystemBlobIntegrationTest.java
@@ -164,6 +164,34 @@ public class FilesystemBlobIntegrationTest extends 
BaseBlobIntegrationTest {
       }
    }
 
+   @Test(groups = { "integration", "live" })
+   public void test10000PartMultipartUpload() throws Exception {
+      BlobStore blobStore = view.getBlobStore();
+      String container = getContainerName();
+      int partSize = (int) blobStore.getMinimumMultipartPartSize();
+      try {
+         String name = "blob-name";
+         BlobBuilder blobBuilder = blobStore.blobBuilder(name);
+         Blob blob = blobBuilder.build();
+         MultipartUpload mpu = blobStore.initiateMultipartUpload(container, 
blob.getMetadata(), new PutOptions());
+         ImmutableList.Builder<MultipartPart> parts = ImmutableList.builder();
+         byte[] content = new byte[partSize];
+
+         for (int i = 0; i < 10 * 1000; ++i) {
+            Payload payload = Payloads.newByteArrayPayload(content);
+            payload.getContentMetadata().setContentLength((long) partSize);
+            parts.add(blobStore.uploadMultipartPart(mpu, i, payload));
+         }
+
+         blobStore.completeMultipartUpload(mpu, parts.build());
+
+         BlobMetadata newBlobMetadata = blobStore.blobMetadata(container, 
name);
+         assertThat(newBlobMetadata.getSize()).isEqualTo(10 * 1000 * partSize);
+      } finally {
+         returnContainer(container);
+      }
+   }
+
    protected void checkExtendedAttributesSupport() {
       if (isMacOSX()) {
          throw new SkipException("filesystem does not support extended 
attributes in Mac OSX");
diff --git 
a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java 
b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
index b62ad18..81c6ae3 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
@@ -28,12 +28,14 @@ import static com.google.common.collect.Sets.newTreeSet;
 import static 
org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -756,7 +758,14 @@ public final class LocalBlobStore implements BlobStore {
       // return InputStream to more closely follow real blobstore
       Payload payload;
       try {
-         payload = new InputStreamPayload(blob.getPayload().openStream());
+         InputStream is = blob.getPayload().openStream();
+         if (is instanceof FileInputStream) {
+            // except for FileInputStream since large MPU can open too many fds
+            is.close();
+            payload = blob.getPayload();
+         } else {
+            payload = new InputStreamPayload(blob.getPayload().openStream());
+         }
       } catch (IOException ioe) {
          throw new RuntimeException(ioe);
       }
@@ -825,20 +834,14 @@ public final class LocalBlobStore implements BlobStore {
 
    @Override
    public String completeMultipartUpload(MultipartUpload mpu, 
List<MultipartPart> parts) {
-      ImmutableList.Builder<InputStream> streams = ImmutableList.builder();
+      ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
       long contentLength = 0;
       Hasher md5Hasher = Hashing.md5().newHasher();
 
       for (MultipartPart part : parts) {
          Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + 
mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
          contentLength += 
blobPart.getMetadata().getContentMetadata().getContentLength();
-         InputStream is;
-         try {
-            is = blobPart.getPayload().openStream();
-         } catch (IOException ioe) {
-            throw propagate(ioe);
-         }
-         streams.add(is);
+         blobs.add(blobPart);
          
md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
       }
       String mpuETag = new StringBuilder("\"")
@@ -849,7 +852,7 @@ public final class LocalBlobStore implements BlobStore {
          .toString();
       PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
             .userMetadata(mpu.blobMetadata().getUserMetadata())
-            .payload(new 
SequenceInputStream(Iterators.asEnumeration(streams.build().iterator())))
+            .payload(new MultiBlobInputStream(blobs.build()))
             .contentLength(contentLength)
             .eTag(mpuETag);
       String cacheControl = 
mpu.blobMetadata().getContentMetadata().getCacheControl();
@@ -998,5 +1001,41 @@ public final class LocalBlobStore implements BlobStore {
       return eTag;
    }
 
+   private static final class MultiBlobInputStream extends InputStream {
+      private final Iterator<Blob> blobs;
+      private InputStream current;
+
+      MultiBlobInputStream(List<Blob> blobs) {
+         this.blobs = blobs.iterator();
+      }
+
+      @Override
+      public int read() throws IOException {
+         byte[] b = new byte[1];
+         int result = read(b, 0, b.length);
+         if (result == -1) {
+            return -1;
+         }
+         return b[0] & 0x000000FF;
+      }
 
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException {
+         while (true) {
+            if (current == null) {
+               if (!blobs.hasNext()) {
+                  return -1;
+               }
+               current = blobs.next().getPayload().openStream();
+            }
+            int result = current.read(b, off, len);
+            if (result == -1) {
+               current.close();
+               current = null;
+               continue;
+            }
+            return result;
+         }
+      }
+   }
 }

Reply via email to