Repository: systemml
Updated Branches:
  refs/heads/master 95cbbd656 -> 7a3447a50


[SYSTEMML-2500] Async matrix allocation on Spark RDD collect

This patch introduces a general performance improvement of RDD collect
operations into the driver memory, by interleaving the matrix allocation
with the collect (and pending RDD evaluation). This is generally useful
because it reduces the serial fraction of parallel programs.

For example, for 100 distributed sum(cumsum(X)) operations, it reduced
the total runtime from 1,102s to 1,006s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/77a7ef15
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/77a7ef15
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/77a7ef15

Branch: refs/heads/master
Commit: 77a7ef155d5f3546d053c7f3d11b1ff3b8021834
Parents: 95cbbd6
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sat Dec 1 17:08:45 2018 +0100
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sat Dec 1 17:08:45 2018 +0100

----------------------------------------------------------------------
 .../controlprogram/caching/LazyWriteBuffer.java       |  4 ++++
 .../controlprogram/context/SparkExecutionContext.java | 14 ++++++++++----
 .../org/apache/sysml/runtime/io/IOUtilFunctions.java  | 10 ++++++++++
 .../apache/sysml/runtime/matrix/data/MatrixBlock.java | 10 ++++++++++
 4 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
index 391f21a..d1dc801 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -272,6 +272,10 @@ public class LazyWriteBuffer
                }
        }
        
+       public static ExecutorService getUtilThreadPool() {
+               return _fClean != null ? _fClean._pool : null;
+       }
+       
        /**
         * Extended LinkedHashMap with convenience methods for adding and 
removing
         * last/first entries.

http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 8981c87..b04aad0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
@@ -46,7 +47,6 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.mlcontext.MLContext;
 import org.apache.sysml.api.mlcontext.MLContextUtil;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -72,6 +72,7 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFu
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -824,7 +825,7 @@ public class SparkExecutionContext extends ExecutionContext
                long t0 = ConfigurationManager.isStatistics() ? 
System.nanoTime() : 0;
 
                MatrixBlock out = null;
-
+               
                if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
                {
                        //special case without copy and nnz maintenance
@@ -846,9 +847,14 @@ public class SparkExecutionContext extends ExecutionContext
 
                        //create output matrix block (w/ lazy allocation)
                        out = new MatrixBlock(rlen, clen, sparse, lnnz);
-
+                       
+                       //kickoff asynchronous allocation
+                       Future<MatrixBlock> fout = out.allocateBlockAsync();
+                       
+                       //trigger pending RDD operations and collect blocks
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
-
+                       out = IOUtilFunctions.get(fout); //wait for allocation
+                       
                        //copy blocks one-at-a-time into output matrix block
                        long aNnz = 0;
                        for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )

http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 4d7e133..6f76d3f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.input.ReaderInputStream;
 import org.apache.commons.lang.StringUtils;
@@ -624,4 +625,13 @@ public class IOUtilFunctions
                buff.get(ret, buff.position(), len);
                return ret;
        }
+       
+       public static <T> T get(Future<T> in) {
+               try {
+                       return in.get();
+               } 
+               catch(Exception e) {
+                       throw new DMLRuntimeException(e);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/77a7ef15/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ad59e20..ae5ab84 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -31,8 +31,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.stream.IntStream;
 
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.apache.commons.math3.random.Well1024a;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -42,6 +45,7 @@ import org.apache.sysml.lops.MapMultChain.ChainType;
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
@@ -332,6 +336,12 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                allocateDenseBlock( true );
                return this;
        }
+       
+       public Future<MatrixBlock> allocateBlockAsync() {
+               ExecutorService pool = LazyWriteBuffer.getUtilThreadPool();
+               return (pool != null) ? pool.submit(() -> allocateBlock()) : 
//async
+                       ConcurrentUtils.constantFuture(allocateBlock()); 
//fallback sync
+       }
 
        public MatrixBlock allocateBlock() {
                if( sparse )

Reply via email to