Repository: systemml Updated Branches: refs/heads/master 5b2815206 -> 1c0efe31a
[SYSTEMML-2193,2194] Common thread pool for multi-threaded operations So far all our multi-threaded operations and readers created fixed thread pools per operation, guarded by minimum parallelization thresholds. However, on machines with large numbers of cores and small intermediates (e.g., mini-batch algorithms) this causes a large serial fraction which severely limited the speedup. This patch addresses this issue by establishing a common thread pool abstraction, which transparently uses a shared pool whenever the maximum degree of parallelism is requested. In order to avoid cleanup issues and for performance reasons, we use the ForkJoinPool.commonPool (as used in all parallel streams) as a shared pool. On a machine with 80 virtual cores (Xeon Gold 6138), this patch improved the runtime of 10K matrix-vector multiplications of size 5K x 1K dense from 76.9s to 3.6s (9.2s with a shared fixed thread pool), which is a sustained bandwidth of 98GB/s despite the overhead and barriers of 10K operations. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/967b7316 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/967b7316 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/967b7316 Branch: refs/heads/master Commit: 967b731644f6f31da2762bd5e4bfab9a7b619c28 Parents: 5b28152 Author: Matthias Boehm <[email protected]> Authored: Wed Mar 21 00:12:04 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Mar 21 23:41:02 2018 -0700 ---------------------------------------------------------------------- .../sysml/runtime/codegen/SpoofCellwise.java | 6 +- .../runtime/codegen/SpoofMultiAggregate.java | 4 +- .../runtime/codegen/SpoofOuterProduct.java | 6 +- .../sysml/runtime/codegen/SpoofRowwise.java | 4 +- .../runtime/compress/CompressedMatrixBlock.java | 16 +-- .../compress/cocode/PlanningCoCoder.java | 4 +- .../io/FrameReaderBinaryBlockParallel.java | 4 +- .../runtime/io/FrameReaderTextCSVParallel.java | 6 +- .../runtime/io/FrameReaderTextCellParallel.java | 4 +- .../io/FrameWriterBinaryBlockParallel.java | 4 +- .../runtime/io/FrameWriterTextCSVParallel.java | 4 +- .../runtime/io/FrameWriterTextCellParallel.java | 4 +- .../runtime/io/ReaderBinaryBlockParallel.java | 4 +- .../sysml/runtime/io/ReaderTextCSVParallel.java | 6 +- .../runtime/io/ReaderTextCellParallel.java | 4 +- .../runtime/io/WriterBinaryBlockParallel.java | 4 +- .../runtime/io/WriterMatrixMarketParallel.java | 4 +- .../sysml/runtime/io/WriterTextCSVParallel.java | 4 +- .../runtime/io/WriterTextCellParallel.java | 4 +- .../sysml/runtime/matrix/data/LibMatrixAgg.java | 10 +- .../sysml/runtime/matrix/data/LibMatrixDNN.java | 4 +- .../runtime/matrix/data/LibMatrixDatagen.java | 4 +- .../runtime/matrix/data/LibMatrixMult.java | 20 +-- .../runtime/matrix/data/LibMatrixReorg.java | 6 +- .../sysml/runtime/util/CommonThreadPool.java | 123 +++++++++++++++++++ .../sysml/runtime/util/UtilFunctions.java | 2 +- 26 files changed, 194 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java index b7024ed..ea070b3 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.sysml.runtime.DMLRuntimeException; @@ -49,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.LibMatrixMult; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.UtilFunctions; public abstract class SpoofCellwise extends SpoofOperator implements Serializable @@ -147,7 +147,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl else //MULTI-THREADED { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<ParAggTask> tasks = new ArrayList<>(); int nk = (a instanceof CompressedMatrixBlock) ? k : UtilFunctions.roundToNext(Math.min(8*k,m/32), k); @@ -243,7 +243,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl else //MULTI-THREADED { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<ParExecTask> tasks = new ArrayList<>(); int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); int blklen = (int)(Math.ceil((double)m/nk)); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java index 72f7764..3801285 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.sysml.runtime.DMLRuntimeException; @@ -44,6 +43,7 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.UtilFunctions; public abstract class SpoofMultiAggregate extends SpoofOperator implements Serializable @@ -117,7 +117,7 @@ public abstract class SpoofMultiAggregate extends SpoofOperator implements Seria else //MULTI-THREADED { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<ParAggTask> tasks = new ArrayList<>(); int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); int blklen = (int)(Math.ceil((double)m/nk)); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java index e6e93a8..1ec98ce 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.sysml.hops.OptimizerUtils; @@ -37,6 +36,7 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.UtilFunctions; public abstract class SpoofOuterProduct extends SpoofOperator @@ -131,7 +131,7 @@ public abstract class SpoofOuterProduct extends SpoofOperator try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<ParOuterProdAggTask> tasks = new ArrayList<>(); int numThreads2 = getPreferredNumberOfTasks(m, n, nnz, k, numThreads); int blklen = (int)(Math.ceil((double)m/numThreads2)); @@ -282,7 +282,7 @@ public abstract class SpoofOuterProduct extends SpoofOperator try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<ParExecTask> tasks = new ArrayList<>(); //create tasks (for wdivmm-left, parallelization over columns; //for wdivmm-right, parallelization over rows; both ensure disjoint results) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java index f10f857..7f308e6 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.IntStream; @@ -41,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.data.SparseRow; import org.apache.sysml.runtime.matrix.data.SparseRowVector; +import org.apache.sysml.runtime.util.CommonThreadPool; public abstract class SpoofRowwise extends SpoofOperator @@ -205,7 +205,7 @@ public abstract class SpoofRowwise extends SpoofOperator double[] scalars = prepInputScalars(scalarObjects); //core parallel execute - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<Integer> blklens = (a instanceof CompressedMatrixBlock) ? LibMatrixMult.getAlignedBlockSizes(m, k, BitmapEncoder.BITMAP_BLOCK_SZ) : LibMatrixMult.getBalancedBlockSizesDefault(m, k, (long)m*n<16*PAR_NUMCELL_THRESHOLD); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 782bbd2..f264b48 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.logging.Log; @@ -92,6 +91,7 @@ import org.apache.sysml.runtime.matrix.operators.QuaternaryOperator; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; import org.apache.sysml.runtime.matrix.operators.UnaryOperator; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.SortUtils; @@ -439,7 +439,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable throws DMLRuntimeException { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<SizeEstimTask> tasks = new ArrayList<>(); for( int col=0; col<clen; col++ ) tasks.add(new SizeEstimTask(estim, col)); @@ -468,7 +468,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable throws DMLRuntimeException { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<CompressTask> tasks = new ArrayList<>(); for( int[] colIndexes : groups ) tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes, denseEst)); @@ -673,7 +673,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //multi-threaded decompression try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); int rlen = getNumRows(); int blklen = BitmapEncoder.getAlignedBlocksize( (int)(Math.ceil((double)rlen/k))); @@ -1252,7 +1252,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( uc != null ) uc.unaryAggregateOperations(op, ret); //compute all compressed column groups - ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() ); + ExecutorService pool = CommonThreadPool.get(op.getNumThreads()); ArrayList<UnaryAggregateTask> tasks = new ArrayList<>(); if( op.indexFn instanceof ReduceCol && grpParts.length > 0 ) { int blklen = BitmapEncoder.getAlignedBlocksize( @@ -1435,7 +1435,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( !isEmptyBlock(false) ) { //compute matrix mult try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>(); int numgrp = _colGroups.size(); int blklen = (int)(Math.ceil((double)numgrp/(2*k))); @@ -1507,7 +1507,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable uc.rightMultByVector(vector, result, k); //compute remaining compressed column groups in parallel - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); int rlen = getNumRows(); int blklen = BitmapEncoder.getAlignedBlocksize( (int)(Math.ceil((double)rlen/k))); @@ -1654,7 +1654,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable uc.leftMultByRowVector(vector, result, k); //compute remaining compressed column groups in parallel - ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); + ExecutorService pool = CommonThreadPool.get( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(4*k, false); ArrayList<LeftMatrixMultTask> tasks = new ArrayList<>(); for( ArrayList<ColGroup> groups : grpParts ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java index 39a44fb..e00c734 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java +++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.logging.Log; @@ -33,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; public class PlanningCoCoder { @@ -96,7 +96,7 @@ public class PlanningCoCoder { List<int[]> retGroups = new ArrayList<>(); try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<CocodeTask> tasks = new ArrayList<>(); for (List<Integer> bin : bins) { // building an array of singleton CoCodingGroup http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java index 6d162cc..b7e635b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -33,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; /** @@ -50,7 +50,7 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock try { //create read tasks for all files - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<>(); for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) tasks.add(new ReadFileTask(lpath, job, fs, dest)); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java index 3819962..a94d5ef 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.util.CommonThreadPool; /** * Multi-threaded frame text csv reader. @@ -67,7 +67,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV try { - ExecutorService pool = Executors.newFixedThreadPool( + ExecutorService pool = CommonThreadPool.get( Math.min(numThreads, splits.length)); //compute num rows per split @@ -115,7 +115,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV //compute number of rows int nrow = 0; - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); try { ArrayList<CountRowsTask> tasks = new ArrayList<>(); for( int i=0; i<splits.length; i++ ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java index 7e88030..70e3651 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; /** * Multi-threaded frame textcell reader. @@ -58,7 +58,7 @@ public class FrameReaderTextCellParallel extends FrameReaderTextCell try { //create read tasks for all splits - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); InputSplit[] splits = informat.getSplits(job, numThreads); ArrayList<ReadTask> tasks = new ArrayList<>(); for( InputSplit split : splits ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java index 763d8a8..e9fc039 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +36,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; @@ -72,7 +72,7 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock //create and execute write tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteFileTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / blen / numThreads) * blen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java index 4c2f39d..460e41c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +36,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -74,7 +74,7 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV //create and execute tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteFileTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java index ba55454..bde3cd4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -69,7 +69,7 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell //create and execute tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteFileTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index 15d8a70..70e6e21 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +38,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; +import org.apache.sysml.runtime.util.CommonThreadPool; public class ReaderBinaryBlockParallel extends ReaderBinaryBlock @@ -91,7 +91,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock try { //create read tasks for all files - ExecutorService pool = Executors.newFixedThreadPool(_numThreads); + ExecutorService pool = CommonThreadPool.get(_numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<>(); for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){ ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, brlen, bclen, syncBlock); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 1fbc341..440992a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -24,7 +24,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -44,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; /** * Parallel version of ReaderTextCSV.java. To summarize, we do two passes in @@ -131,7 +131,7 @@ public class ReaderTextCSVParallel extends MatrixReader TextInputFormat informat = new TextInputFormat(); informat.configure(job); - ExecutorService pool = Executors.newFixedThreadPool(_numThreads); + ExecutorService pool = CommonThreadPool.get(_numThreads); try { @@ -190,7 +190,7 @@ public class ReaderTextCSVParallel extends MatrixReader // count rows in parallel per split try { - ExecutorService pool = Executors.newFixedThreadPool(_numThreads); + ExecutorService pool = CommonThreadPool.get(_numThreads); ArrayList<CountRowsTask> tasks = new ArrayList<>(); for (InputSplit split : splits) { tasks.add(new CountRowsTask(split, informat, job, hasHeader)); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index 6087210..040b43c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -44,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.MapReduceTool; @@ -131,7 +131,7 @@ public class ReaderTextCellParallel extends MatrixReader try { //create read tasks for all splits - ExecutorService pool = Executors.newFixedThreadPool(par); + ExecutorService pool = CommonThreadPool.get(par); InputSplit[] splits = informat.getSplits(job, par); ArrayList<ReadTask> tasks = new ArrayList<>(); for( InputSplit split : splits ){ http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java index 7f83abe..028c8d1 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; public class WriterBinaryBlockParallel extends WriterBinaryBlock @@ -69,7 +69,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock //create and execute write tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteFileTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / brlen / numThreads) * brlen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java index b8f9815..cc377d4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; public class WriterMatrixMarketParallel extends WriterMatrixMarket @@ -67,7 +67,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket //create and execute tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteMMTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index 2c56220..210b349 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +36,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; public class WriterTextCSVParallel extends WriterTextCSV @@ -70,7 +70,7 @@ public class WriterTextCSVParallel extends WriterTextCSV //create and execute tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteCSVTask> tasks = new ArrayList<>(); int rlen = src.getNumRows(); int blklen = (int)Math.ceil((double)rlen / numThreads); http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java index 63f36c2..7e51bc9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.MapReduceTool; public class WriterTextCellParallel extends WriterTextCell @@ -66,7 +66,7 @@ public class WriterTextCellParallel extends WriterTextCell //create and execute tasks try { - ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ExecutorService pool = CommonThreadPool.get(numThreads); ArrayList<WriteTextTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index 7fa14bd..85d5654 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -25,7 +25,6 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; @@ -55,6 +54,7 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.UnaryOperator; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.UtilFunctions; @@ -249,7 +249,7 @@ public class LibMatrixAgg //core multi-threaded unary aggregate computation //(currently: always parallelization over number of rows) try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<AggTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)m/k)); for( int i=0; i<k & i*blklen<m; i++ ) { @@ -343,7 +343,7 @@ public class LibMatrixAgg //core multi-threaded unary aggregate computation //(currently: always parallelization over number of rows) try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); int blklen = (int)(Math.ceil((double)m/k)); //step 1: compute aggregates per row partition @@ -438,7 +438,7 @@ public class LibMatrixAgg //Timing time = new Timing(true); try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<AggTernaryTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)in1.rlen/k)); IndexFunction ixFn = op.indexFn; @@ -516,7 +516,7 @@ public class LibMatrixAgg //core multi-threaded grouped aggregate computation //(currently: parallelization over columns to avoid additional memory requirements) try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<GrpAggTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)target.clen/k)); for( int i=0; i<k & i*blklen<target.clen; i++ ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java index d089521..4cb154b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; @@ -31,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysml.api.DMLScript; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.ConvolutionUtils; /* @@ -437,7 +437,7 @@ public class LibMatrixDNN { } } else { - ExecutorService pool = Executors.newFixedThreadPool( Math.min(k, params.N) ); + ExecutorService pool = CommonThreadPool.get( Math.min(k, params.N) ); List<Future<Long>> taskret = pool.invokeAll(tasks); pool.shutdown(); for( Future<Long> task : taskret ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java index 6df49f2..3e77a72 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.LongStream; @@ -35,6 +34,7 @@ import org.apache.commons.math3.random.Well1024a; import org.apache.sysml.hops.DataGenOp; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.NormalPRNGenerator; import org.apache.sysml.runtime.util.PRNGenerator; import org.apache.sysml.runtime.util.PoissonPRNGenerator; @@ -322,7 +322,7 @@ public class LibMatrixDatagen long[] seeds = generateSeedsForCP(bigrand, nrb, ncb); long nnz = 0; try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<RandTask> tasks = new ArrayList<>(); int blklen = ((int)(Math.ceil((double)parnb/k))); for( int i=0; i<k & i*blklen<parnb; i++ ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java index 9d52f00..214c8eb 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.math3.util.FastMath; @@ -40,6 +39,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.functionobjects.SwapIndex; import org.apache.sysml.runtime.functionobjects.ValueFunction; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.UtilFunctions; /** @@ -205,7 +205,7 @@ public class LibMatrixMult //core multi-threaded matrix mult computation //(currently: always parallelization over number of rows) try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultTask> tasks = new ArrayList<>(); ArrayList<Integer> blklens = getBalancedBlockSizesDefault(num, k, (pm2r||pm2c)); for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) @@ -317,7 +317,7 @@ public class LibMatrixMult //core matrix mult chain computation //(currently: always parallelization over number of rows) try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<Integer> blklens = getBalancedBlockSizesDefault(mX.rlen, k, true); ArrayList<MatrixMultChainTask> tasks = new ArrayList<>(); for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) @@ -398,7 +398,7 @@ public class LibMatrixMult //core multi-threaded matrix mult computation try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>(); //load balance via #tasks=2k due to triangular shape int blklen = (int)(Math.ceil((double)ret.rlen/(2*k))); @@ -478,7 +478,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultPermuteTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)pm1.rlen/k)); for( int i=0; i<k & i*blklen<pm1.rlen; i++ ) @@ -553,7 +553,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultWSLossTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)mX.rlen/k)); for( int i=0; i<k & i*blklen<mX.rlen; i++ ) @@ -629,7 +629,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultWSigmoidTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)mW.rlen/k)); for( int i=0; i<k & i*blklen<mW.rlen; i++ ) @@ -744,7 +744,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultWDivTask> tasks = new ArrayList<>(); //create tasks (for wdivmm-left, parallelization over columns; //for wdivmm-right, parallelization over rows; both ensure disjoint results) @@ -821,7 +821,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultWCeTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)mW.rlen/k)); for( int i=0; i<k & i*blklen<mW.rlen; i++ ) @@ -893,7 +893,7 @@ public class LibMatrixMult try { - ExecutorService pool = Executors.newFixedThreadPool(k); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultWuTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)mW.rlen/k)); for( int i=0; i<k & i*blklen<mW.rlen; i++ ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java index 145a474..c8fedaf 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.sysml.runtime.DMLRuntimeException; @@ -42,6 +41,7 @@ import org.apache.sysml.runtime.functionobjects.SwapIndex; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; +import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.SortUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -185,7 +185,7 @@ public class LibMatrixReorg //core multi-threaded transpose try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); //pre-processing (compute nnz per column once for sparse) int[] cnt = null; if( in.sparse && out.sparse ) { @@ -2051,7 +2051,7 @@ public class LibMatrixReorg } else { try { - ExecutorService pool = Executors.newFixedThreadPool( k ); + ExecutorService pool = CommonThreadPool.get(k); ArrayList<RExpandColsTask> tasks = new ArrayList<>(); int blklen = (int)(Math.ceil((double)rlen/k/8)); for( int i=0; i<8*k & i*blklen<rlen; i++ ) http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java b/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java new file mode 100644 index 0000000..957ec0b --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; + +/** + * This common thread pool provides an abstraction to obtain a shared + * thread pool, specifically the ForkJoinPool.commonPool, for all requests + * of the maximum degree of parallelism. If pools of different size are + * requested, we create new pool instances of FixedThreadPool. + */ +public class CommonThreadPool implements ExecutorService +{ + //shared thread pool used system-wide, potentially by concurrent parfor workers + //we use the ForkJoinPool.commonPool() to avoid explicit cleanup, including + //unnecessary initialization (e.g., problematic in jmlc) and because this commonPool + //resulted in better performance than a dedicated fixed thread pool. + private static final int size = InfrastructureAnalyzer.getLocalParallelism(); + private static final ExecutorService shared = ForkJoinPool.commonPool(); + private final ExecutorService _pool; + + public CommonThreadPool(ExecutorService pool) { + _pool = pool; + } + + public static ExecutorService get(int k) { + return new CommonThreadPool( (size==k) ? + shared : Executors.newFixedThreadPool(k)); + } + + public static void shutdownShared() { + shared.shutdownNow(); + } + + public void shutdown() { + if( _pool != shared ) + _pool.shutdown(); + } + + public List<Runnable> shutdownNow() { + return ( _pool != shared ) ? + _pool.shutdownNow() : null; + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return _pool.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return _pool.invokeAll(tasks, timeout, unit); + } + + public void execute(Runnable command) { + _pool.execute(command); + } + + public <T> Future<T> submit(Callable<T> task) { + return _pool.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) { + return _pool.submit(task, result); + } + + public Future<?> submit(Runnable task) { + return _pool.submit(task); + } + + + //unnecessary methods required for API compliance + + public boolean isShutdown() { + throw new NotImplementedException(); + } + + public boolean isTerminated() { + throw new NotImplementedException(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new NotImplementedException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + throw new NotImplementedException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new NotImplementedException(); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 993c9b2..e0ac955 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -38,7 +38,7 @@ public class UtilFunctions //for accurate cast of double values to int and long //IEEE754: binary64 (double precision) eps = 2^(-53) = 1.11 * 10^(-16) //(same epsilon as used for matrix index cast in R) - public static double DOUBLE_EPS = Math.pow(2, -53); + public static final double DOUBLE_EPS = Math.pow(2, -53); //prime numbers for old hash function (divide prime close to max int, //because it determines the max hash domain size
