Author: krosenvold
Date: Wed Dec 31 14:14:35 2014
New Revision: 1648704

URL: http://svn.apache.org/r1648704
Log:
Added ParallelScatterZipCreator

Added:
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java
    
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java

Added: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java?rev=1648704&view=auto
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java
 (added)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java
 Wed Dec 31 14:14:35 2014
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.compress.archivers.zip;
+
+import java.io.InputStream;
+
+public interface InputStreamSupplier {
+
+    /**
+     * Supply an input stream for a resource.
+     * @return the input stream, may be null if there is no content for the 
resource.
+     */
+    InputStream get();
+}

Added: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1648704&view=auto
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 (added)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 Wed Dec 31 14:14:35 2014
@@ -0,0 +1,155 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.commons.compress.archivers.zip;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Deflater;
+
+/**
+ * Creates a zip in parallel by using multiple threadlocal 
#ScatterZipOutputStream instances.
+ * <p/>
+ * Note that this class generally makes no guarantees about the order of 
things written to
+ * the output file. Things that need to come in a specific order (manifests, 
directories)
+ * must be handled by the client of this class, usually by writing these 
things to the
+ * #ZipArchiveOutputStream *before* calling #writeTo on this class.
+ */
+public class ParallelScatterZipCreator {
+    private List<ScatterZipOutputStream> streams = 
Collections.synchronizedList(new ArrayList<ScatterZipOutputStream>());
+    private final ExecutorService es;
+
+    private final long startedAt = System.currentTimeMillis();
+    private long compressionDoneAt = 0;
+    private long scatterDoneAt;
+
+    static ScatterGatherBackingStoreSupplier defaultSupplier = new 
DefaultSupplier();
+
+    static class DefaultSupplier implements ScatterGatherBackingStoreSupplier {
+        AtomicInteger storeNum = new AtomicInteger(0);
+
+        public ScatterGatherBackingStore get() throws IOException {
+            File tempFile = File.createTempFile("parallelscatter", "n" + 
storeNum.incrementAndGet());
+            return new FileBasedScatterGatherBackingStore(tempFile);
+        }
+    }
+
+    static ScatterZipOutputStream 
createDeferred(ScatterGatherBackingStoreSupplier 
scatterGatherBackingStoreSupplier)
+            throws IOException {
+        ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
+        StreamCompressor sc = 
StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
+        return new ScatterZipOutputStream(bs, sc);
+    }
+
+
+    ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new 
ThreadLocal<ScatterZipOutputStream>() {
+        @Override
+        protected ScatterZipOutputStream initialValue() {
+            try {
+                ScatterZipOutputStream scatterStream = 
createDeferred(defaultSupplier);
+                streams.add(scatterStream);
+                return scatterStream;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+
+    /**
+     * Create a ParallelScatterZipCreator with default threads
+     */
+    public ParallelScatterZipCreator() {
+        this(Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
+     * Create a ParallelScatterZipCreator
+     *
+     * @param nThreads the number of threads to use in parallel.
+     */
+    public ParallelScatterZipCreator(int nThreads) {
+        es = Executors.newFixedThreadPool(nThreads);
+    }
+
+    /**
+     * Adds an archive entry to this archive.
+     * <p/>
+     * This method is expected to be called from a single client thread
+     *
+     * @param zipArchiveEntry The entry to add. Compression method
+     * @param source          The source input stream supplier
+     */
+
+    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final 
InputStreamSupplier source) {
+        final int method = zipArchiveEntry.getMethod();
+        if (method == -1) throw new IllegalArgumentException("Method must be 
set on the supplied zipArchiveEntry");
+        // Consider if we want to constrain the number of items that can 
enqueue here.
+        es.submit(new Callable<ScatterZipOutputStream>() {
+            public ScatterZipOutputStream call() throws Exception {
+                ScatterZipOutputStream streamToUse = tlScatterStreams.get();
+                streamToUse.addArchiveEntry(zipArchiveEntry, source.get(), 
method);
+                return streamToUse;
+            }
+
+        });
+    }
+
+
+    /**
+     * Write the contents this to the target #ZipArchiveOutputStream.
+     * <p/>
+     * It may be beneficial to write things like directories and manifest 
files to the targetStream
+     * before calling this method.
+     *
+     * @param targetStream The ZipArchiveOutputStream to receive the contents 
of the scatter streams
+     * @throws IOException          If writing fails
+     * @throws InterruptedException If we get interrupted
+     */
+    public void writeTo(ZipArchiveOutputStream targetStream) throws 
IOException, InterruptedException {
+        es.shutdown();
+        es.awaitTermination(1000 * 60, TimeUnit.SECONDS);
+
+        // It is important that all threads terminate before we go on, ensure 
happens-before relationship
+        compressionDoneAt = System.currentTimeMillis();
+
+        for (ScatterZipOutputStream scatterStream : streams) {
+            scatterStream.writeTo(targetStream);
+        }
+
+        scatterDoneAt = System.currentTimeMillis();
+        // Maybe close ScatterZipOS. We should do something to get rid of 
tempfiles.
+    }
+
+    /**
+     * Returns a message describing the overall statistics of the compression 
run
+     *
+     * @return A string
+     */
+    public String getStatisticsMessage() {
+        return "Compression: " + (compressionDoneAt - startedAt) + "ms," +
+                "Merging files: " + (scatterDoneAt - compressionDoneAt) + "ms";
+    }
+}
+

Added: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java?rev=1648704&view=auto
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java
 (added)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java
 Wed Dec 31 14:14:35 2014
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.commons.compress.archivers.zip;
+
+import java.io.IOException;
+
+public interface ScatterGatherBackingStoreSupplier {
+    /**
+     * Get a ScatterGatherBackingStore.
+     *
+     * @return a ScatterGatherBackingStore, not null
+     */
+    ScatterGatherBackingStore get() throws IOException;
+}

Added: 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java?rev=1648704&view=auto
==============================================================================
--- 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
 (added)
+++ 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
 Wed Dec 31 14:14:35 2014
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.commons.compress.archivers.zip;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("OctalInteger")
+public class ParallelScatterZipCreatorTest {
+
+    @Test
+    public void concurrent()
+            throws Exception {
+        File result = File.createTempFile("parallelScatterGather1", "");
+        ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
+        zos.setEncoding("UTF-8");
+        ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator();
+
+        Map<String, byte[]> entries = writeEntries(zipCreator);
+        zipCreator.writeTo(zos);
+        zos.close();
+
+        removeEntriesFoundInZipFile(result, entries);
+        assertTrue(entries.size() == 0);
+        assertNotNull( zipCreator.getStatisticsMessage());
+    }
+
+    private void removeEntriesFoundInZipFile(File result, Map<String, byte[]> 
entries) throws IOException {
+        ZipFile zf = new ZipFile(result);
+        Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = 
zf.getEntriesInPhysicalOrder();
+        while (entriesInPhysicalOrder.hasMoreElements()){
+            ZipArchiveEntry zipArchiveEntry = 
entriesInPhysicalOrder.nextElement();
+            InputStream inputStream = zf.getInputStream(zipArchiveEntry);
+            byte[] actual = IOUtils.toByteArray(inputStream);
+            byte[] expected = entries.remove(zipArchiveEntry.getName());
+            assertArrayEquals( expected, actual);
+        }
+        zf.close();
+    }
+
+    private Map<String, byte[]> writeEntries(ParallelScatterZipCreator 
zipCreator) {
+        Map<String, byte[]> entries = new HashMap<String, byte[]>();
+        for (int i = 0; i < 10000; i++){
+            ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
+            final String payload = "content" + i;
+            final byte[] payloadBytes = payload.getBytes();
+            entries.put( za.getName(), payloadBytes);
+            za.setMethod(ZipArchiveEntry.DEFLATED);
+            za.setSize(payload.length());
+            za.setUnixMode(UnixStat.FILE_FLAG | 0664);
+            zipCreator.addArchiveEntry(za, new InputStreamSupplier() {
+                public InputStream get() {
+                    return new ByteArrayInputStream(payloadBytes);
+                }
+            });
+        }
+        return entries;
+    }
+}
\ No newline at end of file


Reply via email to