Repository: systemml Updated Branches: refs/heads/master e0c271fe4 -> 8a5bdba43
[SYSTEMML-2291] Performance layered-graph sparsity estimator This patch significantly improves the build and estimate performance of the sparsity estimator based on layered graphs (by 1000x and 100x, respectively). In particular, this includes (1) build graph with array-based node partitions for O(1) access, (2) reduced node sizes, and (3) proper memoization on propagating r-vectors bottom-up through the graph. On a scenario of estimating the output sparsity of a self matrix product of a 10K x 10K matrix with sparsity 0.01, this patch improved the build/estimate times from 798s/120s to 87ms/1.1s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8a5bdba4 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8a5bdba4 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8a5bdba4 Branch: refs/heads/master Commit: 8a5bdba432350792e0d78aa96825be92596a58cf Parents: e0c271f Author: Matthias Boehm <[email protected]> Authored: Fri Jul 27 15:44:51 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jul 27 15:44:51 2018 -0700 ---------------------------------------------------------------------- .../sysml/hops/estim/EstimatorLayeredGraph.java | 261 +++++++++---------- .../paramserv/spark/rpc/PSRpcFactory.java | 2 +- 2 files changed, 119 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/8a5bdba4/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java b/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java index 9b25a74..babffe8 100644 --- a/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java +++ b/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java @@ -21,11 +21,16 @@ package org.apache.sysml.hops.estim; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.math3.distribution.ExponentialDistribution; import org.apache.commons.math3.random.Well1024a; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * This estimator implements an approach based on a so-called layered graph, @@ -59,159 +64,129 @@ public class EstimatorLayeredGraph extends SparsityEstimator { @Override public double estim(MatrixBlock m1, MatrixBlock m2){ - int layer = 3; - LayeredGraph LGraph = new LayeredGraph(m1, m2); - //lambda is not the mean, if lambda is 2 hand in 1/2 - ExponentialDistribution random = new ExponentialDistribution(new Well1024a(), 1); - for (int h = 0; h < LGraph.nodes.size(); h++) { - if (LGraph.nodes.get(h).getY() == 1) { - double[] doubArray = new double[_rounds]; - for (int g = 0; g < _rounds; g++) - doubArray[g] = random.sample(); - LGraph.nodes.get(h).setVector(doubArray); - } - } - // get r for nodes of upper layer - for (int h = 0; h < LGraph.nodes.size(); h++) { - if (LGraph.nodes.get(h).getY() == layer) { - double[] ret = recr(_rounds, LGraph.nodes.get(h)); - if(ret != null) - LGraph.nodes.get(h).setVector(ret); - LGraph.nodes.get(h).setValue( - calcNNZ(LGraph.nodes.get(h).getVector(), _rounds)); - } - } - //calc final sparsity - double nnz = LGraph.nodes.stream().filter(n -> n.getY()==layer) - .mapToDouble(n -> n.getValue()).sum(); - return nnz / m1.getNumRows() / m2.getNumColumns(); + LayeredGraph graph = new LayeredGraph(m1, m2, _rounds); + return OptimizerUtils.getSparsity(m1.getNumRows(), + m2.getNumColumns(), graph.estimateNnz()); } - - - public double[] recr(int numr, Node tempnode) { - if (tempnode.getInput().isEmpty()) - return (tempnode.getY() == 1) ? tempnode.getVector() : null; - else if (tempnode.getInput().size() == 1) - return recr(numr, tempnode.getInput().get(0)); - else { - return tempnode.getInput().stream() - .map(n -> recr(numr, n)).filter(v -> v != null) - .reduce((v1,v2) -> min(v1,v2)).get(); - } - } - - private double[] min(double[] v1, double[] v2) { - double[] ret = new double[v1.length]; - for(int i=0; i<v1.length; i++) - ret[i] = Math.min(v1[i], v2[i]); - return ret; - } - - public double calcNNZ(double[] inpvec, int numr) { - return (inpvec != null && inpvec.length > 0) ? - (numr - 1) / Arrays.stream(inpvec).sum() : 0; - } - - private class LayeredGraph { - List<Node> nodes = new ArrayList<>(); - public LayeredGraph(MatrixBlock m1, MatrixBlock m2) { - createNodes(m1, 1, nodes); - createNodes(m2, 2, nodes); + private static class LayeredGraph { + private final List<Node[]> _nodes; //nodes partitioned by graph level + private final int _rounds; //length of propagated r-vectors + + public LayeredGraph(MatrixBlock m1, MatrixBlock m2, int r) { + _nodes = new ArrayList<>(); + _rounds = r; + buildNext(m1); + buildNext(m2); } - } - - public void createNodes(MatrixBlock m, int mpos, List<Node> nodes) { - if( m.isEmpty() ) - return; - Node nodeout = null; - Node nodein = null; - //TODO perf: separate handling sparse and dense - //TODO perf: hash lookups for existing nodes - for (int i = 0; i < m.getNumRows(); i++) { - for (int j = 0; j < m.getNumColumns(); j++) { - if (m.getValue(i, j) == 0) continue; - boolean alreadyExists = false; - boolean alreadyExists2 = false; - for (int k = 0; k < nodes.size(); k++) { - if (nodes.get(k).getX() == i && nodes.get(k).getY() == mpos) { - alreadyExists = true; - } - } - if (!alreadyExists) { - nodein = new Node(i, mpos); - nodes.add(nodein); - } else { - for (int k = 0; k < nodes.size(); k++) { - if (nodes.get(k).getX() == i && nodes.get(k).getY() == mpos) { - nodein = nodes.get(k); - } - } - } - for (int k = 0; k < nodes.size(); k++) { - if (nodes.get(k).getX() == j && nodes.get(k).getY() == mpos + 1) { - alreadyExists2 = true; - } + public void buildNext(MatrixBlock mb) { + if( mb.isEmpty() ) + return; + final int m = mb.getNumRows(); + final int n = mb.getNumColumns(); + + //step 1: create node arrays for rows/cols + Node[] rows = null, cols = null; + if( _nodes.size() == 0 ) { + rows = new Node[m]; + for(int i=0; i<m; i++) + rows[i] = new Node(); + _nodes.add(rows); + } + else { + rows = _nodes.get(_nodes.size()-1); + } + cols = new Node[n]; + for(int j=0; j<n; j++) + cols[j] = new Node(); + _nodes.add(cols); + + //step 2: create edges for non-zero values + if( mb.isInSparseFormat() ) { + SparseBlock a = mb.getSparseBlock(); + for(int i=0; i < m; i++) { + if( a.isEmpty(i) ) continue; + int apos = a.pos(i); + int alen = a.size(i); + int[] aix = a.indexes(i); + for(int k=apos; k<apos+alen; k++) + cols[aix[k]].addInput(rows[i]); } - if (!alreadyExists2) { - nodeout = new Node(j, mpos + 1); - nodes.add(nodeout); - - } else { - for (int k = 0; k < nodes.size(); k++) { - if (nodes.get(k).getX() == j && nodes.get(k).getY() == mpos + 1) { - nodeout = nodes.get(k); - } - } + } + else { //dense + DenseBlock a = mb.getDenseBlock(); + for (int i=0; i<m; i++) { + double[] avals = a.values(i); + int aix = a.pos(i); + for (int j=0; j<m; j++) + if( avals[aix+j] != 0 ) + cols[j].addInput(rows[i]); } - nodeout.addnz(nodein); } } - } - - private static class Node { - int xpos; - int ypos; - double[] r_vector; - List<Node> input = new ArrayList<>(); - double value; - - public Node(int x, int y) { - xpos = x; - ypos = y; - } - - public void setValue(double inp) { - value = inp; - } - - public double getValue() { - return value; - } - - public List<Node> getInput() { - return input; - } - - public int getX() { - return xpos; - } - - public int getY() { - return ypos; - } - - public double[] getVector() { - return r_vector; + + public long estimateNnz() { + //step 1: assign random vectors ~exp(lambda=1) to all leaf nodes + //(lambda is not the mean, if lambda is 2 mean is 1/2) + ExponentialDistribution random = new ExponentialDistribution(new Well1024a(), 1); + for( Node n : _nodes.get(0) ) { + double[] rvect = new double[_rounds]; + for (int g = 0; g < _rounds; g++) + rvect[g] = random.sample(); + n.setVector(rvect); + } + + //step 2: propagate vectors bottom-up and aggregate nnz + return (long) Arrays.stream(_nodes.get(_nodes.size()-1)) + .mapToDouble(n -> calcNNZ(n.computeVector(_rounds), _rounds)).sum(); } - - public void setVector(double[] r_input) { - r_vector = r_input; + + private static double calcNNZ(double[] inpvec, int rounds) { + return (inpvec != null && inpvec.length > 0) ? + (rounds - 1) / Arrays.stream(inpvec).sum() : 0; } - - public void addnz(Node dest) { - input.add(dest); + + private static class Node { + private List<Node> _input = new ArrayList<>(); + private double[] _rvect; + + public List<Node> getInput() { + return _input; + } + + public double[] getVector() { + return _rvect; + } + + public void setVector(double[] rvect) { + _rvect = rvect; + } + + public void addInput(Node dest) { + _input.add(dest); + } + + private double[] computeVector(int rounds) { + if( _rvect != null || getInput().isEmpty() ) + return _rvect; + //recursively compute input vectors + List<double[]> ltmp = getInput().stream().map(n -> n.computeVector(rounds)) + .filter(v -> v!=null).collect(Collectors.toList()); + if( ltmp.isEmpty() ) + return null; + else if( ltmp.size() == 1 ) + return _rvect = ltmp.get(0); + else { + double[] tmp = ltmp.get(0).clone(); + for(int i=1; i<_input.size(); i++) { + double[] v2 = _input.get(i).getVector(); + for(int j=0; j<rounds; j++) + tmp[j] = Math.min(tmp[j], v2[j]); + } + return _rvect = tmp; + } + } } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/8a5bdba4/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java index 2d921de..5e76d23 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java @@ -36,7 +36,7 @@ public class PSRpcFactory { private static final String MODULE_NAME = "ps"; private static TransportContext createTransportContext(SparkConf conf, LocalParamServer ps) { - TransportConf tc = SparkTransportConf.fromSparkConf(conf, MODULE_NAME, 0);; + TransportConf tc = SparkTransportConf.fromSparkConf(conf, MODULE_NAME, 0); PSRpcHandler handler = new PSRpcHandler(ps); return new TransportContext(tc, handler); }
