Author: krosenvold Date: Wed Dec 31 14:14:35 2014 New Revision: 1648704 URL: http://svn.apache.org/r1648704 Log: Added ParallelScatterZipCreator
Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java?rev=1648704&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/InputStreamSupplier.java Wed Dec 31 14:14:35 2014 @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.commons.compress.archivers.zip; + +import java.io.InputStream; + +public interface InputStreamSupplier { + + /** + * Supply an input stream for a resource. + * @return the input stream, may be null if there is no content for the resource. + */ + InputStream get(); +} Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1648704&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java Wed Dec 31 14:14:35 2014 @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.Deflater; + +/** + * Creates a zip in parallel by using multiple threadlocal #ScatterZipOutputStream instances. + * <p/> + * Note that this class generally makes no guarantees about the order of things written to + * the output file. Things that need to come in a specific order (manifests, directories) + * must be handled by the client of this class, usually by writing these things to the + * #ZipArchiveOutputStream *before* calling #writeTo on this class. + */ +public class ParallelScatterZipCreator { + private List<ScatterZipOutputStream> streams = Collections.synchronizedList(new ArrayList<ScatterZipOutputStream>()); + private final ExecutorService es; + + private final long startedAt = System.currentTimeMillis(); + private long compressionDoneAt = 0; + private long scatterDoneAt; + + static ScatterGatherBackingStoreSupplier defaultSupplier = new DefaultSupplier(); + + static class DefaultSupplier implements ScatterGatherBackingStoreSupplier { + AtomicInteger storeNum = new AtomicInteger(0); + + public ScatterGatherBackingStore get() throws IOException { + File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet()); + return new FileBasedScatterGatherBackingStore(tempFile); + } + } + + static ScatterZipOutputStream createDeferred(ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) + throws IOException { + ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); + StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); + return new ScatterZipOutputStream(bs, sc); + } + + + ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { + @Override + protected ScatterZipOutputStream initialValue() { + try { + ScatterZipOutputStream scatterStream = createDeferred(defaultSupplier); + streams.add(scatterStream); + return scatterStream; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + + /** + * Create a ParallelScatterZipCreator with default threads + */ + public ParallelScatterZipCreator() { + this(Runtime.getRuntime().availableProcessors()); + } + + /** + * Create a ParallelScatterZipCreator + * + * @param nThreads the number of threads to use in parallel. + */ + public ParallelScatterZipCreator(int nThreads) { + es = Executors.newFixedThreadPool(nThreads); + } + + /** + * Adds an archive entry to this archive. + * <p/> + * This method is expected to be called from a single client thread + * + * @param zipArchiveEntry The entry to add. Compression method + * @param source The source input stream supplier + */ + + public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { + final int method = zipArchiveEntry.getMethod(); + if (method == -1) throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry"); + // Consider if we want to constrain the number of items that can enqueue here. + es.submit(new Callable<ScatterZipOutputStream>() { + public ScatterZipOutputStream call() throws Exception { + ScatterZipOutputStream streamToUse = tlScatterStreams.get(); + streamToUse.addArchiveEntry(zipArchiveEntry, source.get(), method); + return streamToUse; + } + + }); + } + + + /** + * Write the contents this to the target #ZipArchiveOutputStream. + * <p/> + * It may be beneficial to write things like directories and manifest files to the targetStream + * before calling this method. + * + * @param targetStream The ZipArchiveOutputStream to receive the contents of the scatter streams + * @throws IOException If writing fails + * @throws InterruptedException If we get interrupted + */ + public void writeTo(ZipArchiveOutputStream targetStream) throws IOException, InterruptedException { + es.shutdown(); + es.awaitTermination(1000 * 60, TimeUnit.SECONDS); + + // It is important that all threads terminate before we go on, ensure happens-before relationship + compressionDoneAt = System.currentTimeMillis(); + + for (ScatterZipOutputStream scatterStream : streams) { + scatterStream.writeTo(targetStream); + } + + scatterDoneAt = System.currentTimeMillis(); + // Maybe close ScatterZipOS. We should do something to get rid of tempfiles. + } + + /** + * Returns a message describing the overall statistics of the compression run + * + * @return A string + */ + public String getStatisticsMessage() { + return "Compression: " + (compressionDoneAt - startedAt) + "ms," + + "Merging files: " + (scatterDoneAt - compressionDoneAt) + "ms"; + } +} + Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java?rev=1648704&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterGatherBackingStoreSupplier.java Wed Dec 31 14:14:35 2014 @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.commons.compress.archivers.zip; + +import java.io.IOException; + +public interface ScatterGatherBackingStoreSupplier { + /** + * Get a ScatterGatherBackingStore. + * + * @return a ScatterGatherBackingStore, not null + */ + ScatterGatherBackingStore get() throws IOException; +} Added: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java?rev=1648704&view=auto ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java (added) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java Wed Dec 31 14:14:35 2014 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import org.apache.commons.compress.utils.IOUtils; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +@SuppressWarnings("OctalInteger") +public class ParallelScatterZipCreatorTest { + + @Test + public void concurrent() + throws Exception { + File result = File.createTempFile("parallelScatterGather1", ""); + ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); + zos.setEncoding("UTF-8"); + ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(); + + Map<String, byte[]> entries = writeEntries(zipCreator); + zipCreator.writeTo(zos); + zos.close(); + + removeEntriesFoundInZipFile(result, entries); + assertTrue(entries.size() == 0); + assertNotNull( zipCreator.getStatisticsMessage()); + } + + private void removeEntriesFoundInZipFile(File result, Map<String, byte[]> entries) throws IOException { + ZipFile zf = new ZipFile(result); + Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); + while (entriesInPhysicalOrder.hasMoreElements()){ + ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement(); + InputStream inputStream = zf.getInputStream(zipArchiveEntry); + byte[] actual = IOUtils.toByteArray(inputStream); + byte[] expected = entries.remove(zipArchiveEntry.getName()); + assertArrayEquals( expected, actual); + } + zf.close(); + } + + private Map<String, byte[]> writeEntries(ParallelScatterZipCreator zipCreator) { + Map<String, byte[]> entries = new HashMap<String, byte[]>(); + for (int i = 0; i < 10000; i++){ + ZipArchiveEntry za = new ZipArchiveEntry( "file" + i); + final String payload = "content" + i; + final byte[] payloadBytes = payload.getBytes(); + entries.put( za.getName(), payloadBytes); + za.setMethod(ZipArchiveEntry.DEFLATED); + za.setSize(payload.length()); + za.setUnixMode(UnixStat.FILE_FLAG | 0664); + zipCreator.addArchiveEntry(za, new InputStreamSupplier() { + public InputStream get() { + return new ByteArrayInputStream(payloadBytes); + } + }); + } + return entries; + } +} \ No newline at end of file