Repository: systemml
Updated Branches:
  refs/heads/master 5b2815206 -> 1c0efe31a


[SYSTEMML-2193,2194] Common thread pool for multi-threaded operations

So far all our multi-threaded operations and readers created fixed
thread pools per operation, guarded by minimum parallelization
thresholds. However, on machines with large numbers of cores and small
intermediates (e.g., mini-batch algorithms) this causes a large serial
fraction which severely limited the speedup.

This patch addresses this issue by establishing a common thread pool
abstraction, which transparently uses a shared pool whenever the maximum
degree of parallelism is requested. In order to avoid cleanup issues and
for performance reasons, we use the ForkJoinPool.commonPool (as used in
all parallel streams) as a shared pool.

On a machine with 80 virtual cores (Xeon Gold 6138), this patch improved
the runtime of 10K matrix-vector multiplications of size 5K x 1K dense
from 76.9s to 3.6s (9.2s with a shared fixed thread pool), which is a
sustained bandwidth of 98GB/s despite the overhead and barriers of 10K
operations.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/967b7316
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/967b7316
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/967b7316

Branch: refs/heads/master
Commit: 967b731644f6f31da2762bd5e4bfab9a7b619c28
Parents: 5b28152
Author: Matthias Boehm <[email protected]>
Authored: Wed Mar 21 00:12:04 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Mar 21 23:41:02 2018 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofCellwise.java    |   6 +-
 .../runtime/codegen/SpoofMultiAggregate.java    |   4 +-
 .../runtime/codegen/SpoofOuterProduct.java      |   6 +-
 .../sysml/runtime/codegen/SpoofRowwise.java     |   4 +-
 .../runtime/compress/CompressedMatrixBlock.java |  16 +--
 .../compress/cocode/PlanningCoCoder.java        |   4 +-
 .../io/FrameReaderBinaryBlockParallel.java      |   4 +-
 .../runtime/io/FrameReaderTextCSVParallel.java  |   6 +-
 .../runtime/io/FrameReaderTextCellParallel.java |   4 +-
 .../io/FrameWriterBinaryBlockParallel.java      |   4 +-
 .../runtime/io/FrameWriterTextCSVParallel.java  |   4 +-
 .../runtime/io/FrameWriterTextCellParallel.java |   4 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   |   4 +-
 .../sysml/runtime/io/ReaderTextCSVParallel.java |   6 +-
 .../runtime/io/ReaderTextCellParallel.java      |   4 +-
 .../runtime/io/WriterBinaryBlockParallel.java   |   4 +-
 .../runtime/io/WriterMatrixMarketParallel.java  |   4 +-
 .../sysml/runtime/io/WriterTextCSVParallel.java |   4 +-
 .../runtime/io/WriterTextCellParallel.java      |   4 +-
 .../sysml/runtime/matrix/data/LibMatrixAgg.java |  10 +-
 .../sysml/runtime/matrix/data/LibMatrixDNN.java |   4 +-
 .../runtime/matrix/data/LibMatrixDatagen.java   |   4 +-
 .../runtime/matrix/data/LibMatrixMult.java      |  20 +--
 .../runtime/matrix/data/LibMatrixReorg.java     |   6 +-
 .../sysml/runtime/util/CommonThreadPool.java    | 123 +++++++++++++++++++
 .../sysml/runtime/util/UtilFunctions.java       |   2 +-
 26 files changed, 194 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
index b7024ed..ea070b3 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -49,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public abstract class SpoofCellwise extends SpoofOperator implements 
Serializable
@@ -147,7 +147,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                else  //MULTI-THREADED
                {
                        try {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ExecutorService pool = CommonThreadPool.get(k);
                                ArrayList<ParAggTask> tasks = new ArrayList<>();
                                int nk = (a instanceof CompressedMatrixBlock) ? 
k :
                                        
UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
@@ -243,7 +243,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                else  //MULTI-THREADED
                {
                        try {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ExecutorService pool = CommonThreadPool.get(k);
                                ArrayList<ParExecTask> tasks = new 
ArrayList<>();
                                int nk = 
UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
                                int blklen = (int)(Math.ceil((double)m/nk));

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
index 72f7764..3801285 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -44,6 +43,7 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public abstract class SpoofMultiAggregate extends SpoofOperator implements 
Serializable
@@ -117,7 +117,7 @@ public abstract class SpoofMultiAggregate extends 
SpoofOperator implements Seria
                else  //MULTI-THREADED
                {
                        try {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ExecutorService pool = CommonThreadPool.get(k);
                                ArrayList<ParAggTask> tasks = new ArrayList<>();
                                int nk = 
UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
                                int blklen = (int)(Math.ceil((double)m/nk));

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java
index e6e93a8..1ec98ce 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.hops.OptimizerUtils;
@@ -37,6 +36,7 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public abstract class SpoofOuterProduct extends SpoofOperator
@@ -131,7 +131,7 @@ public abstract class SpoofOuterProduct extends 
SpoofOperator
                
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<ParOuterProdAggTask> tasks = new 
ArrayList<>();
                        int numThreads2 = getPreferredNumberOfTasks(m, n, nnz, 
k, numThreads);
                        int blklen = (int)(Math.ceil((double)m/numThreads2));
@@ -282,7 +282,7 @@ public abstract class SpoofOuterProduct extends 
SpoofOperator
                
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<ParExecTask> tasks = new ArrayList<>();
                        //create tasks (for wdivmm-left, parallelization over 
columns;
                        //for wdivmm-right, parallelization over rows; both 
ensure disjoint results)

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index f10f857..7f308e6 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.IntStream;
 
@@ -41,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseRow;
 import org.apache.sysml.runtime.matrix.data.SparseRowVector;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 
 public abstract class SpoofRowwise extends SpoofOperator
@@ -205,7 +205,7 @@ public abstract class SpoofRowwise extends SpoofOperator
                double[] scalars = prepInputScalars(scalarObjects);
                
                //core parallel execute
-               ExecutorService pool = Executors.newFixedThreadPool( k );
+               ExecutorService pool = CommonThreadPool.get(k);
                ArrayList<Integer> blklens = (a instanceof 
CompressedMatrixBlock) ?
                        LibMatrixMult.getAlignedBlockSizes(m, k, 
BitmapEncoder.BITMAP_BLOCK_SZ) :
                        LibMatrixMult.getBalancedBlockSizesDefault(m, k, 
(long)m*n<16*PAR_NUMCELL_THRESHOLD);

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 782bbd2..f264b48 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
@@ -92,6 +91,7 @@ import 
org.apache.sysml.runtime.matrix.operators.QuaternaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.IndexRange;
 import org.apache.sysml.runtime.util.SortUtils;
 
@@ -439,7 +439,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                throws DMLRuntimeException 
        {       
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<SizeEstimTask> tasks = new ArrayList<>();
                        for( int col=0; col<clen; col++ )
                                tasks.add(new SizeEstimTask(estim, col));
@@ -468,7 +468,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                throws DMLRuntimeException
        {
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<CompressTask> tasks = new ArrayList<>();
                        for( int[] colIndexes : groups )
                                tasks.add(new CompressTask(in, estim, 
compRatios, rlen, colIndexes, denseEst));
@@ -673,7 +673,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                //multi-threaded decompression
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        int rlen = getNumRows();
                        int blklen = BitmapEncoder.getAlignedBlocksize(
                                (int)(Math.ceil((double)rlen/k)));
@@ -1252,7 +1252,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                if( uc != null )
                                        uc.unaryAggregateOperations(op, ret);
                                //compute all compressed column groups
-                               ExecutorService pool = 
Executors.newFixedThreadPool( op.getNumThreads() );
+                               ExecutorService pool = 
CommonThreadPool.get(op.getNumThreads());
                                ArrayList<UnaryAggregateTask> tasks = new 
ArrayList<>();
                                if( op.indexFn instanceof ReduceCol && 
grpParts.length > 0 ) {
                                        int blklen = 
BitmapEncoder.getAlignedBlocksize(
@@ -1435,7 +1435,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                if( !isEmptyBlock(false) ) {
                        //compute matrix mult
                        try {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ExecutorService pool = CommonThreadPool.get(k);
                                ArrayList<MatrixMultTransposeTask> tasks = new 
ArrayList<>();
                                int numgrp = _colGroups.size();
                                int blklen = 
(int)(Math.ceil((double)numgrp/(2*k)));
@@ -1507,7 +1507,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                uc.rightMultByVector(vector, result, k);
                        
                        //compute remaining compressed column groups in parallel
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        int rlen = getNumRows();
                        int blklen = BitmapEncoder.getAlignedBlocksize(
                                (int)(Math.ceil((double)rlen/k)));
@@ -1654,7 +1654,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                uc.leftMultByRowVector(vector, result, k);
                        
                        //compute remaining compressed column groups in parallel
-                       ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size()-((uc!=null)?1:0), k) );
+                       ExecutorService pool = CommonThreadPool.get( 
Math.min(colGroups.size()-((uc!=null)?1:0), k) );
                        ArrayList<ColGroup>[] grpParts = 
createStaticTaskPartitioning(4*k, false);
                        ArrayList<LeftMatrixMultTask> tasks = new ArrayList<>();
                        for( ArrayList<ColGroup> groups : grpParts )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
index 39a44fb..e00c734 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 public class PlanningCoCoder 
 {
@@ -96,7 +96,7 @@ public class PlanningCoCoder
        {
                List<int[]> retGroups = new ArrayList<>();
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<CocodeTask> tasks = new ArrayList<>();
                        for (List<Integer> bin : bins) {
                                // building an array of singleton CoCodingGroup

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
index 6d162cc..b7e635b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 
 /**
@@ -50,7 +50,7 @@ public class FrameReaderBinaryBlockParallel extends 
FrameReaderBinaryBlock
                try 
                {
                        //create read tasks for all files
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<ReadFileTask> tasks = new ArrayList<>();
                        for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) )
                                tasks.add(new ReadFileTask(lpath, job, fs, 
dest));

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
index 3819962..a94d5ef 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +41,7 @@ import 
org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 /**
  * Multi-threaded frame text csv reader.
@@ -67,7 +67,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
 
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(
+                       ExecutorService pool = CommonThreadPool.get(
                                Math.min(numThreads, splits.length));
                        
                        //compute num rows per split
@@ -115,7 +115,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                
                //compute number of rows
                int nrow = 0;
-               ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+               ExecutorService pool = CommonThreadPool.get(numThreads);
                try {
                        ArrayList<CountRowsTask> tasks = new ArrayList<>();
                        for( int i=0; i<splits.length; i++ )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
index 7e88030..70e3651 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 /**
  * Multi-threaded frame textcell reader.
@@ -58,7 +58,7 @@ public class FrameReaderTextCellParallel extends 
FrameReaderTextCell
                try 
                {
                        //create read tasks for all splits
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        InputSplit[] splits = informat.getSplits(job, 
numThreads);
                        ArrayList<ReadTask> tasks = new ArrayList<>();
                        for( InputSplit split : splits )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
index 763d8a8..e9fc039 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 
@@ -72,7 +72,7 @@ public class FrameWriterBinaryBlockParallel extends 
FrameWriterBinaryBlock
                //create and execute write tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteFileTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / blen / 
numThreads) * blen;
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
index 4c2f39d..460e41c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -74,7 +74,7 @@ public class FrameWriterTextCSVParallel extends 
FrameWriterTextCSV
                //create and execute tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteFileTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
index ba55454..bde3cd4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -69,7 +69,7 @@ public class FrameWriterTextCellParallel extends 
FrameWriterTextCell
                //create and execute tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteFileTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 15d8a70..70e6e21 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +38,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 
 public class ReaderBinaryBlockParallel extends ReaderBinaryBlock 
@@ -91,7 +91,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                try 
                {
                        //create read tasks for all files
-                       ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
+                       ExecutorService pool = 
CommonThreadPool.get(_numThreads);
                        ArrayList<ReadFileTask> tasks = new ArrayList<>();
                        for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) ){
                                ReadFileTask t = new ReadFileTask(lpath, job, 
dest, rlen, clen, brlen, bclen, syncBlock);

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 1fbc341..440992a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -24,7 +24,6 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 
 /**
  * Parallel version of ReaderTextCSV.java. To summarize, we do two passes in
@@ -131,7 +131,7 @@ public class ReaderTextCSVParallel extends MatrixReader
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
 
-               ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
+               ExecutorService pool = CommonThreadPool.get(_numThreads);
 
                try 
                {
@@ -190,7 +190,7 @@ public class ReaderTextCSVParallel extends MatrixReader
                // count rows in parallel per split
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
+                       ExecutorService pool = 
CommonThreadPool.get(_numThreads);
                        ArrayList<CountRowsTask> tasks = new ArrayList<>();
                        for (InputSplit split : splits) {
                                tasks.add(new CountRowsTask(split, informat, 
job, hasHeader));

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 6087210..040b43c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
@@ -131,7 +131,7 @@ public class ReaderTextCellParallel extends MatrixReader
                try 
                {
                        //create read tasks for all splits
-                       ExecutorService pool = 
Executors.newFixedThreadPool(par);
+                       ExecutorService pool = CommonThreadPool.get(par);
                        InputSplit[] splits = informat.getSplits(job, par);
                        ArrayList<ReadTask> tasks = new ArrayList<>();
                        for( InputSplit split : splits ){

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
index 7f83abe..028c8d1 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterBinaryBlockParallel extends WriterBinaryBlock
@@ -69,7 +69,7 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                //create and execute write tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteFileTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / brlen / 
numThreads) * brlen;
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
index b8f9815..cc377d4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterMatrixMarketParallel extends WriterMatrixMarket
@@ -67,7 +67,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                //create and execute tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteMMTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
index 2c56220..210b349 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterTextCSVParallel extends WriterTextCSV
@@ -70,7 +70,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
                //create and execute tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteCSVTask> tasks = new ArrayList<>();
                        int rlen = src.getNumRows();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
index 63f36c2..7e51bc9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,7 @@ import org.apache.sysml.hops.OptimizerUtils;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterTextCellParallel extends WriterTextCell
@@ -66,7 +66,7 @@ public class WriterTextCellParallel extends WriterTextCell
                //create and execute tasks
                try 
                {
-                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ExecutorService pool = CommonThreadPool.get(numThreads);
                        ArrayList<WriteTextTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 7fa14bd..85d5654 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -25,7 +25,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
@@ -55,6 +54,7 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -249,7 +249,7 @@ public class LibMatrixAgg
                //core multi-threaded unary aggregate computation
                //(currently: always parallelization over number of rows)
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<AggTask> tasks = new ArrayList<>();
                        int blklen = (int)(Math.ceil((double)m/k));
                        for( int i=0; i<k & i*blklen<m; i++ ) {
@@ -343,7 +343,7 @@ public class LibMatrixAgg
                //core multi-threaded unary aggregate computation
                //(currently: always parallelization over number of rows)
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        int blklen = (int)(Math.ceil((double)m/k));
                        
                        //step 1: compute aggregates per row partition
@@ -438,7 +438,7 @@ public class LibMatrixAgg
                //Timing time = new Timing(true);
                
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<AggTernaryTask> tasks = new ArrayList<>();
                        int blklen = (int)(Math.ceil((double)in1.rlen/k));
                        IndexFunction ixFn = op.indexFn;
@@ -516,7 +516,7 @@ public class LibMatrixAgg
                //core multi-threaded grouped aggregate computation
                //(currently: parallelization over columns to avoid additional 
memory requirements)
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<GrpAggTask> tasks = new ArrayList<>();
                        int blklen = (int)(Math.ceil((double)target.clen/k));
                        for( int i=0; i<k & i*blklen<target.clen; i++ )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java
index d089521..4cb154b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDNN.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.ConvolutionUtils;
 
 /*
@@ -437,7 +437,7 @@ public class LibMatrixDNN {
                                }
                        }
                        else {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( Math.min(k, params.N) );
+                               ExecutorService pool = CommonThreadPool.get( 
Math.min(k, params.N) );
                                List<Future<Long>> taskret = 
pool.invokeAll(tasks);
                                pool.shutdown();
                                for( Future<Long> task : taskret )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
index 6df49f2..3e77a72 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.LongStream;
 
@@ -35,6 +34,7 @@ import org.apache.commons.math3.random.Well1024a;
 import org.apache.sysml.hops.DataGenOp;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.NormalPRNGenerator;
 import org.apache.sysml.runtime.util.PRNGenerator;
 import org.apache.sysml.runtime.util.PoissonPRNGenerator;
@@ -322,7 +322,7 @@ public class LibMatrixDatagen
                long[] seeds = generateSeedsForCP(bigrand, nrb, ncb);
                long nnz = 0;
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<RandTask> tasks = new ArrayList<>();
                        int blklen = ((int)(Math.ceil((double)parnb/k)));
                        for( int i=0; i<k & i*blklen<parnb; i++ ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 9d52f00..214c8eb 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.math3.util.FastMath;
@@ -40,6 +39,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.SwapIndex;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -205,7 +205,7 @@ public class LibMatrixMult
                //core multi-threaded matrix mult computation
                //(currently: always parallelization over number of rows)
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultTask> tasks = new ArrayList<>();
                        ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(num, k, (pm2r||pm2c));
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
@@ -317,7 +317,7 @@ public class LibMatrixMult
                //core matrix mult chain computation
                //(currently: always parallelization over number of rows)
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(mX.rlen, k, true);
                        ArrayList<MatrixMultChainTask> tasks = new 
ArrayList<>();
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
@@ -398,7 +398,7 @@ public class LibMatrixMult
        
                //core multi-threaded matrix mult computation
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultTransposeTask> tasks = new 
ArrayList<>();
                        //load balance via #tasks=2k due to triangular shape 
                        int blklen = (int)(Math.ceil((double)ret.rlen/(2*k)));
@@ -478,7 +478,7 @@ public class LibMatrixMult
                
                try
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultPermuteTask> tasks = new 
ArrayList<>();
                        int blklen = (int)(Math.ceil((double)pm1.rlen/k));
                        for( int i=0; i<k & i*blklen<pm1.rlen; i++ )
@@ -553,7 +553,7 @@ public class LibMatrixMult
                
                try 
                {                       
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultWSLossTask> tasks = new 
ArrayList<>();
                        int blklen = (int)(Math.ceil((double)mX.rlen/k));
                        for( int i=0; i<k & i*blklen<mX.rlen; i++ )
@@ -629,7 +629,7 @@ public class LibMatrixMult
                
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultWSigmoidTask> tasks = new 
ArrayList<>();
                        int blklen = (int)(Math.ceil((double)mW.rlen/k));
                        for( int i=0; i<k & i*blklen<mW.rlen; i++ )
@@ -744,7 +744,7 @@ public class LibMatrixMult
                
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultWDivTask> tasks = new ArrayList<>();
                        //create tasks (for wdivmm-left, parallelization over 
columns;
                        //for wdivmm-right, parallelization over rows; both 
ensure disjoint results)
@@ -821,7 +821,7 @@ public class LibMatrixMult
                
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultWCeTask> tasks = new ArrayList<>();
                        int blklen = (int)(Math.ceil((double)mW.rlen/k));
                        for( int i=0; i<k & i*blklen<mW.rlen; i++ )
@@ -893,7 +893,7 @@ public class LibMatrixMult
                
                try 
                {
-                       ExecutorService pool = Executors.newFixedThreadPool(k);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultWuTask> tasks = new ArrayList<>();
                        int blklen = (int)(Math.ceil((double)mW.rlen/k));
                        for( int i=0; i<k & i*blklen<mW.rlen; i++ )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 145a474..c8fedaf 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -42,6 +41,7 @@ import org.apache.sysml.runtime.functionobjects.SwapIndex;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.SortUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -185,7 +185,7 @@ public class LibMatrixReorg
                
                //core multi-threaded transpose
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ExecutorService pool = CommonThreadPool.get(k);
                        //pre-processing (compute nnz per column once for 
sparse)
                        int[] cnt = null;
                        if( in.sparse && out.sparse ) {
@@ -2051,7 +2051,7 @@ public class LibMatrixReorg
                }
                else {
                        try {
-                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ExecutorService pool = CommonThreadPool.get(k);
                                ArrayList<RExpandColsTask> tasks = new 
ArrayList<>();
                                int blklen = (int)(Math.ceil((double)rlen/k/8));
                                for( int i=0; i<8*k & i*blklen<rlen; i++ )

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java 
b/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java
new file mode 100644
index 0000000..957ec0b
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/CommonThreadPool.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.NotImplementedException;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+
+/**
+ * This common thread pool provides an abstraction to obtain a shared
+ * thread pool, specifically the ForkJoinPool.commonPool, for all requests
+ * of the maximum degree of parallelism. If pools of different size are
+ * requested, we create new pool instances of FixedThreadPool.
+ */
+public class CommonThreadPool implements ExecutorService
+{
+       //shared thread pool used system-wide, potentially by concurrent parfor 
workers
+       //we use the ForkJoinPool.commonPool() to avoid explicit cleanup, 
including
+       //unnecessary initialization (e.g., problematic in jmlc) and because 
this commonPool
+       //resulted in better performance than a dedicated fixed thread pool.
+       private static final int size = 
InfrastructureAnalyzer.getLocalParallelism();
+       private static final ExecutorService shared = ForkJoinPool.commonPool();
+       private final ExecutorService _pool;
+
+       public CommonThreadPool(ExecutorService pool) {
+               _pool = pool;
+       }
+
+       public static ExecutorService get(int k) {
+               return new CommonThreadPool( (size==k) ?
+                       shared : Executors.newFixedThreadPool(k));
+       }
+
+       public static void shutdownShared() {
+               shared.shutdownNow();
+       }
+
+       public void shutdown() {
+               if( _pool != shared )
+                       _pool.shutdown();
+       }
+
+       public List<Runnable> shutdownNow() {
+               return ( _pool != shared ) ?
+                       _pool.shutdownNow() : null;
+       }
+
+       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+               return _pool.invokeAll(tasks);
+       }
+
+       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit)
+                       throws InterruptedException {
+               return _pool.invokeAll(tasks, timeout, unit);
+       }
+       
+       public void execute(Runnable command) {
+               _pool.execute(command);
+       }
+
+       public <T> Future<T> submit(Callable<T> task) {
+               return _pool.submit(task);
+       }
+
+       public <T> Future<T> submit(Runnable task, T result) {
+               return _pool.submit(task, result);
+       }
+
+       public Future<?> submit(Runnable task) {
+               return _pool.submit(task);
+       }
+
+       
+       //unnecessary methods required for API compliance
+
+       public boolean isShutdown() {
+               throw new NotImplementedException();
+       }
+
+       public boolean isTerminated() {
+               throw new NotImplementedException();
+       }
+
+       public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+               throw new NotImplementedException();
+       }
+
+       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+               throw new NotImplementedException();
+       }
+
+       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+                       throws InterruptedException, ExecutionException, 
TimeoutException {
+               throw new NotImplementedException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/967b7316/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 993c9b2..e0ac955 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -38,7 +38,7 @@ public class UtilFunctions
        //for accurate cast of double values to int and long 
        //IEEE754: binary64 (double precision) eps = 2^(-53) = 1.11 * 10^(-16)
        //(same epsilon as used for matrix index cast in R)
-       public static double DOUBLE_EPS = Math.pow(2, -53);
+       public static final double DOUBLE_EPS = Math.pow(2, -53);
        
        //prime numbers for old hash function (divide prime close to max int, 
        //because it determines the max hash domain size

Reply via email to