Author: krosenvold
Date: Tue Jan 13 06:59:24 2015
New Revision: 1651285
URL: http://svn.apache.org/r1651285
Log:
Changed from nThreads to receiving an ExecutorService
There are a lot of different models/versions of executorservices, also
varying according to client JDK level. Give client full control of how the
executor service is created and also possibly how to schedule tasks through
a slightly lower-level cerateCallable/submit api.
Termination of ExecutorService is still controlled by ParallelScatterZipCreator,
as must be.
Modified:
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
Modified:
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
URL:
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1651285&r1=1651284&r2=1651285&view=diff
==============================================================================
---
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
(original)
+++
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
Tue Jan 13 06:59:24 2015
@@ -40,18 +40,22 @@ import static java.util.Collections.sync
* the output file. Things that need to come in a specific order (manifests,
directories)
* must be handled by the client of this class, usually by writing these
things to the
* #ZipArchiveOutputStream *before* calling #writeTo on this class.</p>
+ * <p>
+ * The client can supply an ExecutorService, but for reasons of memory model
consistency,
+ * this will be shut down by this class prior to completion.
+ * </p>
*/
public class ParallelScatterZipCreator {
private final List<ScatterZipOutputStream> streams = synchronizedList(new
ArrayList<ScatterZipOutputStream>());
private final ExecutorService es;
- private final ScatterGatherBackingStoreSupplier supplier;
+ private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
private final List<Future> futures = new ArrayList<Future>();
private final long startedAt = System.currentTimeMillis();
private long compressionDoneAt = 0;
private long scatterDoneAt;
- private static class DefaultSupplier implements
ScatterGatherBackingStoreSupplier {
+ private static class DefaultBackingStoreSupplier implements
ScatterGatherBackingStoreSupplier {
final AtomicInteger storeNum = new AtomicInteger(0);
public ScatterGatherBackingStore get() throws IOException {
@@ -71,7 +75,7 @@ public class ParallelScatterZipCreator {
@Override
protected ScatterZipOutputStream initialValue() {
try {
- ScatterZipOutputStream scatterStream =
createDeferred(supplier);
+ ScatterZipOutputStream scatterStream =
createDeferred(backingStoreSupplier);
streams.add(scatterStream);
return scatterStream;
} catch (IOException e) {
@@ -84,27 +88,30 @@ public class ParallelScatterZipCreator {
* Create a ParallelScatterZipCreator with default threads
*/
public ParallelScatterZipCreator() {
- this(Runtime.getRuntime().availableProcessors());
+
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
}
/**
* Create a ParallelScatterZipCreator
*
- * @param nThreads the number of threads to use in parallel.
+ * @param executorService The executorService to use for parallel
scheduling. For technical reasons,
+ * this will be shut down by this class.
*/
- public ParallelScatterZipCreator(int nThreads) {
- this( nThreads, new DefaultSupplier());
+ public ParallelScatterZipCreator(ExecutorService executorService) {
+ this(executorService, new DefaultBackingStoreSupplier());
}
/**
* Create a ParallelScatterZipCreator
*
- * @param nThreads the number of threads to use in parallel.
+ * @param executorService The executorService to use. For technical
reasons, this will be shut down
+ * by this class.
* @param backingStoreSupplier The supplier of backing store which shall
be used
*/
- public ParallelScatterZipCreator(int nThreads,
ScatterGatherBackingStoreSupplier backingStoreSupplier) {
- supplier = backingStoreSupplier;
- es = Executors.newFixedThreadPool(nThreads);
+ public ParallelScatterZipCreator(ExecutorService executorService,
+ ScatterGatherBackingStoreSupplier
backingStoreSupplier) {
+ this.backingStoreSupplier = backingStoreSupplier;
+ es = executorService;
}
/**
@@ -113,19 +120,43 @@ public class ParallelScatterZipCreator {
* This method is expected to be called from a single client thread
* </p>
*
- * @param zipArchiveEntry The entry to add. Compression method
+ * @param zipArchiveEntry The entry to add.
* @param source The source input stream supplier
*/
public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final
InputStreamSupplier source) {
+ submit(createCallable(zipArchiveEntry, source));
+ }
+
+ /**
+ * Submit a callable for compression
+ * @param callable The callable to run
+ */
+ public void submit(Callable<Object> callable) {
+ futures.add(es.submit(callable));
+ }
+
+ /**
+ * Create a callable that will compress the given archive entry.
+ *
+ * <p>This method is expected to be called from a single client thread.</p>
+ * <p>
+ * This method is used by clients that want finer grained control over how
the callable is
+ * created, possibly wanting to wrap this callable in a different
callable</p>
+ *
+ * @param zipArchiveEntry The entry to add.
+ * @param source The source input stream supplier
+ * @return A callable that will be used to check for errors
+ */
+
+ public Callable<Object> createCallable(final ZipArchiveEntry
zipArchiveEntry, final InputStreamSupplier source) {
final int method = zipArchiveEntry.getMethod();
if (method == ZipMethod.UNKNOWN_CODE) {
throw new IllegalArgumentException("Method must be set on the
supplied zipArchiveEntry");
}
- // Consider if we want to constrain the number of items that can
enqueue here.
- Future<Object> future = es.submit(new Callable<Object>() {
- public Void call() throws Exception {
- ScatterZipOutputStream streamToUse = tlScatterStreams.get();
+ return new Callable<Object>() {
+ public Object call() throws Exception {
+ final ScatterZipOutputStream streamToUse =
tlScatterStreams.get();
InputStream payload = source.get();
try {
streamToUse.addArchiveEntry(zipArchiveEntry, payload,
method);
@@ -134,9 +165,7 @@ public class ParallelScatterZipCreator {
}
return null;
}
-
- });
- futures.add( future);
+ };
}
@@ -161,7 +190,7 @@ public class ParallelScatterZipCreator {
}
es.shutdown();
- es.awaitTermination(1000 * 60, TimeUnit.SECONDS);
+ es.awaitTermination(1000 * 60, TimeUnit.SECONDS); // == Infinity. We
really *must* wait for this to complete
// It is important that all threads terminate before we go on, ensure
happens-before relationship
compressionDoneAt = System.currentTimeMillis();
Modified:
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
URL:
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java?rev=1651285&r1=1651284&r2=1651285&view=diff
==============================================================================
---
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
(original)
+++
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
Tue Jan 13 06:59:24 2015
@@ -27,12 +27,17 @@ import java.io.InputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.junit.Assert.*;
@SuppressWarnings("OctalInteger")
public class ParallelScatterZipCreatorTest {
+ private final int NUMITEMS = 5000;
+
@Test
public void concurrent()
throws Exception {
@@ -44,12 +49,36 @@ public class ParallelScatterZipCreatorTe
Map<String, byte[]> entries = writeEntries(zipCreator);
zipCreator.writeTo(zos);
zos.close();
-
removeEntriesFoundInZipFile(result, entries);
assertTrue(entries.size() == 0);
assertNotNull( zipCreator.getStatisticsMessage());
}
+ @Test
+ public void callableApi()
+ throws Exception {
+ File result = File.createTempFile("parallelScatterGather2", "");
+ ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
+ zos.setEncoding("UTF-8");
+ ExecutorService es = Executors.newFixedThreadPool(1);
+
+ ScatterGatherBackingStoreSupplier supp = new
ScatterGatherBackingStoreSupplier() {
+ public ScatterGatherBackingStore get() throws IOException {
+ return new
FileBasedScatterGatherBackingStore(File.createTempFile("parallelscatter",
"n1"));
+ }
+ };
+
+ ParallelScatterZipCreator zipCreator = new
ParallelScatterZipCreator(es, supp);
+ Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator);
+ zipCreator.writeTo(zos);
+ zos.close();
+
+
+ removeEntriesFoundInZipFile(result, entries);
+ assertTrue(entries.size() == 0);
+ assertNotNull(zipCreator.getStatisticsMessage());
+ }
+
private void removeEntriesFoundInZipFile(File result, Map<String, byte[]>
entries) throws IOException {
ZipFile zf = new ZipFile(result);
Enumeration<ZipArchiveEntry> entriesInPhysicalOrder =
zf.getEntriesInPhysicalOrder();
@@ -58,21 +87,16 @@ public class ParallelScatterZipCreatorTe
InputStream inputStream = zf.getInputStream(zipArchiveEntry);
byte[] actual = IOUtils.toByteArray(inputStream);
byte[] expected = entries.remove(zipArchiveEntry.getName());
- assertArrayEquals( expected, actual);
+ assertArrayEquals( "For " + zipArchiveEntry.getName(), expected,
actual);
}
zf.close();
}
private Map<String, byte[]> writeEntries(ParallelScatterZipCreator
zipCreator) {
Map<String, byte[]> entries = new HashMap<String, byte[]>();
- for (int i = 0; i < 10000; i++){
- ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
- final String payload = "content" + i;
- final byte[] payloadBytes = payload.getBytes();
- entries.put( za.getName(), payloadBytes);
- za.setMethod(ZipArchiveEntry.DEFLATED);
- za.setSize(payload.length());
- za.setUnixMode(UnixStat.FILE_FLAG | 0664);
+ for (int i = 0; i < NUMITEMS; i++){
+ final byte[] payloadBytes = ("content" + i).getBytes();
+ ZipArchiveEntry za = createZipArchiveEntry(entries, i,
payloadBytes);
zipCreator.addArchiveEntry(za, new InputStreamSupplier() {
public InputStream get() {
return new ByteArrayInputStream(payloadBytes);
@@ -81,4 +105,28 @@ public class ParallelScatterZipCreatorTe
}
return entries;
}
+
+ private Map<String, byte[]>
writeEntriesAsCallable(ParallelScatterZipCreator zipCreator) {
+ Map<String, byte[]> entries = new HashMap<String, byte[]>();
+ for (int i = 0; i < NUMITEMS; i++){
+ final byte[] payloadBytes = ("content" + i).getBytes();
+ ZipArchiveEntry za = createZipArchiveEntry(entries, i,
payloadBytes);
+ final Callable<Object> callable = zipCreator.createCallable(za,
new InputStreamSupplier() {
+ public InputStream get() {
+ return new ByteArrayInputStream(payloadBytes);
+ }
+ });
+ zipCreator.submit(callable);
+ }
+ return entries;
+ }
+
+ private ZipArchiveEntry createZipArchiveEntry(Map<String, byte[]> entries,
int i, byte[] payloadBytes) {
+ ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
+ entries.put( za.getName(), payloadBytes);
+ za.setMethod(ZipArchiveEntry.DEFLATED);
+ za.setSize(payloadBytes.length);
+ za.setUnixMode(UnixStat.FILE_FLAG | 0664);
+ return za;
+ }
}
\ No newline at end of file