http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java index efdcc86..6d2ec43 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java @@ -22,31 +22,29 @@ package org.apache.sysml.runtime.compress; import java.util.Arrays; import java.util.Iterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.utils.ConverterUtils; import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.functionobjects.KahanFunction; import org.apache.sysml.runtime.functionobjects.KahanPlus; -import org.apache.sysml.runtime.functionobjects.KahanPlusSq; -import org.apache.sysml.runtime.functionobjects.ReduceAll; -import org.apache.sysml.runtime.functionobjects.ReduceCol; -import org.apache.sysml.runtime.functionobjects.ReduceRow; -import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; -import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; /** A group of columns compressed with a single run-length encoded bitmap. */ -public class ColGroupRLE extends ColGroupBitmap +public class ColGroupRLE extends ColGroupOffset { private static final long serialVersionUID = 7450232907594748177L; + private static final Log LOG = LogFactory.getLog(ColGroupRLE.class.getName()); + public ColGroupRLE() { - super(CompressionType.RLE_BITMAP); + super(); } /** @@ -62,26 +60,37 @@ public class ColGroupRLE extends ColGroupBitmap */ public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm) { - super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm); + super(colIndices, numRows, ubm); // compress the bitmaps final int numVals = ubm.getNumValues(); char[][] lbitmaps = new char[numVals][]; int totalLen = 0; for( int k=0; k<numVals; k++ ) { - lbitmaps[k] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(k)); + lbitmaps[k] = BitmapEncoder.genRLEBitmap( + ubm.getOffsetsList(k).extractValues(), ubm.getNumOffsets(k)); totalLen += lbitmaps[k].length; } // compact bitmaps to linearized representation createCompressedBitmaps(numVals, totalLen, lbitmaps); + + //debug output + double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, colIndices.length); + if( estimateInMemorySize() > ucSize ) + LOG.warn("RLE group larger than UC dense: "+estimateInMemorySize()+" "+ucSize); } public ColGroupRLE(int[] colIndices, int numRows, boolean zeros, double[] values, char[] bitmaps, int[] bitmapOffs) { - super(CompressionType.RLE_BITMAP, colIndices, numRows, zeros, values); + super(colIndices, numRows, zeros, values); _data = bitmaps; _ptr = bitmapOffs; } + + @Override + public CompressionType getCompType() { + return CompressionType.RLE_BITMAP; + } @Override public Iterator<Integer> getDecodeIterator(int k) { @@ -247,7 +256,7 @@ public class ColGroupRLE extends ColGroupBitmap //L3 cache alignment, see comment rightMultByVector OLE column group //core difference of RLE to OLE is that runs are not segment alignment, //which requires care of handling runs crossing cache-buckets - final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; + final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ; //step 1: prepare position and value arrays @@ -335,7 +344,7 @@ public class ColGroupRLE extends ColGroupBitmap if( LOW_LEVEL_OPT && numVals > 1 && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) { - final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; + final int blksz = ColGroupOffset.READ_CACHE_BLKSZ; //step 1: prepare position and value arrays @@ -423,7 +432,7 @@ public class ColGroupRLE extends ColGroupBitmap } double[] rvalues = applyScalarOp(op, val0, getNumCols()); - char[] lbitmap = BitmapEncoder.genRLEBitmap(loff); + char[] lbitmap = BitmapEncoder.genRLEBitmap(loff, loff.length); char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length); System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length); int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1); @@ -432,49 +441,9 @@ public class ColGroupRLE extends ColGroupBitmap return new ColGroupRLE(_colIndexes, _numRows, loff.length<_numRows, rvalues, rbitmaps, rbitmapOffs); } - - @Override - public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) - throws DMLRuntimeException - { - unaryAggregateOperations(op, result, 0, getNumRows()); - } - - - @Override - public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) - throws DMLRuntimeException - { - //sum and sumsq (reduceall/reducerow over tuples and counts) - if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) - { - KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? - KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); - - if( op.indexFn instanceof ReduceAll ) - computeSum(result, kplus); - else if( op.indexFn instanceof ReduceCol ) - computeRowSums(result, kplus, rl, ru); - else if( op.indexFn instanceof ReduceRow ) - computeColSums(result, kplus); - } - //min and max (reduceall/reducerow over tuples only) - else if(op.aggOp.increOp.fn instanceof Builtin - && (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX - || ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) - { - Builtin builtin = (Builtin) op.aggOp.increOp.fn; - if( op.indexFn instanceof ReduceAll ) - computeMxx(result, builtin); - else if( op.indexFn instanceof ReduceCol ) - computeRowMxx(result, builtin, rl, ru); - else if( op.indexFn instanceof ReduceRow ) - computeColMxx(result, builtin); - } - } - - private void computeSum(MatrixBlock result, KahanFunction kplus) + @Override + protected final void computeSum(MatrixBlock result, KahanFunction kplus) { KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); @@ -502,37 +471,93 @@ public class ColGroupRLE extends ColGroupBitmap result.quickSetValue(0, 1, kbuff._correction); } - private void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) + @Override + protected final void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) { KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); + final int numVals = getNumValues(); double[] c = result.getDenseBlock(); - for (int k = 0; k < numVals; k++) { - int boff = _ptr[k]; - int blen = len(k); - double val = sumValues(k); + if( ALLOW_CACHE_CONSCIOUS_ROWSUMS + && LOW_LEVEL_OPT && numVals > 1 + && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) + { + final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ/2; + + //step 1: prepare position and value arrays + + //current pos / values per RLE list + int[] astart = new int[numVals]; + int[] apos = skipScan(numVals, rl, astart); + double[] aval = sumAllValues(kplus, kbuff); + + //step 2: cache conscious matrix-vector via horizontal scans + for( int bi=rl; bi<ru; bi+=blksz ) + { + int bimax = Math.min(bi+blksz, ru); + + //horizontal segment scan, incl pos maintenance + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + double val = aval[k]; + int bix = apos[k]; + int start = astart[k]; + + //compute partial results, not aligned + while( bix<blen ) { + int lstart = _data[boff + bix]; + int llen = _data[boff + bix + 1]; + int from = Math.max(bi, start+lstart); + int to = Math.min(start+lstart+llen,bimax); + for (int rix=from; rix<to; rix++) { + kbuff.set(c[2*rix], c[2*rix+1]); + kplus2.execute2(kbuff, val); + c[2*rix] = kbuff._sum; + c[2*rix+1] = kbuff._correction; + } + if(start+lstart+llen >= bimax) + break; + start += lstart + llen; + bix += 2; + } - if (val != 0.0) { - Pair<Integer,Integer> tmp = skipScanVal(k, rl); - int bix = tmp.getKey(); - int curRunStartOff = tmp.getValue(); - int curRunEnd = tmp.getValue(); - for ( ; bix<blen && curRunEnd<ru; bix+=2) { - curRunStartOff = curRunEnd + _data[boff+bix]; - curRunEnd = curRunStartOff + _data[boff+bix+1]; - for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) { - kbuff.set(c[2*rix], c[2*rix+1]); - kplus.execute2(kbuff, val); - c[2*rix] = kbuff._sum; - c[2*rix+1] = kbuff._correction; + apos[k] = bix; + astart[k] = start; + } + } + } + else + { + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + double val = sumValues(k, kplus, kbuff); + + if (val != 0.0) { + Pair<Integer,Integer> tmp = skipScanVal(k, rl); + int bix = tmp.getKey(); + int curRunStartOff = tmp.getValue(); + int curRunEnd = tmp.getValue(); + for ( ; bix<blen && curRunEnd<ru; bix+=2) { + curRunStartOff = curRunEnd + _data[boff+bix]; + curRunEnd = curRunStartOff + _data[boff+bix+1]; + for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) { + kbuff.set(c[2*rix], c[2*rix+1]); + kplus2.execute2(kbuff, val); + c[2*rix] = kbuff._sum; + c[2*rix+1] = kbuff._correction; + } } } } } } - private void computeColSums(MatrixBlock result, KahanFunction kplus) + @Override + protected final void computeColSums(MatrixBlock result, KahanFunction kplus) { KahanObject kbuff = new KahanObject(0, 0); @@ -561,7 +586,8 @@ public class ColGroupRLE extends ColGroupBitmap } } - private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) + @Override + protected final void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) { //NOTE: zeros handled once for all column groups outside final int numVals = getNumValues();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java index 9d06bf8..6445c52 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow; import org.apache.sysml.runtime.matrix.data.LibMatrixAgg; import org.apache.sysml.runtime.matrix.data.LibMatrixMult; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock.Type; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; import org.apache.sysml.runtime.util.SortUtils; @@ -53,7 +54,7 @@ public class ColGroupUncompressed extends ColGroup private MatrixBlock _data; public ColGroupUncompressed() { - super(CompressionType.UNCOMPRESSED, (int[])null, -1); + super((int[])null, -1); } /** @@ -71,7 +72,7 @@ public class ColGroupUncompressed extends ColGroup public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock) throws DMLRuntimeException { - super(CompressionType.UNCOMPRESSED, colIndicesList, + super(colIndicesList, CompressedMatrixBlock.TRANSPOSE_INPUT ? rawblock.getNumColumns() : rawblock.getNumRows()); @@ -97,7 +98,7 @@ public class ColGroupUncompressed extends ColGroup return; } - // dense implementation for dense and sparse matrices to avoid linear search + //dense implementation for dense and sparse matrices to avoid linear search int m = numRows; int n = _colIndexes.length; for( int i = 0; i < m; i++) { @@ -109,6 +110,11 @@ public class ColGroupUncompressed extends ColGroup } } _data.examSparsity(); + + //convert sparse MCSR to read-optimized CSR representation + if( _data.isInSparseFormat() ) { + _data = new MatrixBlock(_data, Type.CSR, false); + } } /** @@ -121,8 +127,7 @@ public class ColGroupUncompressed extends ColGroup */ public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) { - super(CompressionType.UNCOMPRESSED, - mergeColIndices(groupsToDecompress), + super(mergeColIndices(groupsToDecompress), groupsToDecompress.get(0)._numRows); // Invert the list of column indices @@ -152,10 +157,14 @@ public class ColGroupUncompressed extends ColGroup */ public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data) { - super(CompressionType.UNCOMPRESSED, colIndices, numRows); + super(colIndices, numRows); _data = data; } + @Override + public CompressionType getCompType() { + return CompressionType.UNCOMPRESSED; + } /** * Access for superclass @@ -276,6 +285,23 @@ public class ColGroupUncompressed extends ColGroup LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru); } + public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k) + throws DMLRuntimeException + { + // Pull out the relevant rows of the vector + int clen = _colIndexes.length; + + MatrixBlock shortVector = new MatrixBlock(clen, 1, false); + shortVector.allocateDenseBlock(); + double[] b = shortVector.getDenseBlock(); + for (int colIx = 0; colIx < clen; colIx++) + b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0); + shortVector.recomputeNonZeros(); + + // Multiply the selected columns by the appropriate parts of the vector + LibMatrixMult.matrixMult(_data, shortVector, result, k); + } + @Override public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) throws DMLRuntimeException @@ -377,8 +403,7 @@ public class ColGroupUncompressed extends ColGroup } @Override - protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) - { + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { for( int i=rl; i<ru; i++ ) rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java new file mode 100644 index 0000000..b3b5e80 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.util.Arrays; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.functionobjects.Builtin; +import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; + + +/** + * Base class for column groups encoded with value dictionary. + * + */ +public abstract class ColGroupValue extends ColGroup +{ + private static final long serialVersionUID = 3786247536054353658L; + + public static boolean LOW_LEVEL_OPT = true; + + //sorting of values by physical length helps by 10-20%, especially for serial, while + //slight performance decrease for parallel incl multi-threaded, hence not applied for + //distributed operations (also because compression time + garbage collection increases) + public static final boolean SORT_VALUES_BY_LENGTH = true; + + + /** Distinct values associated with individual bitmaps. */ + protected double[] _values; //linearized <numcol vals> <numcol vals> + + public ColGroupValue() { + super((int[]) null, -1); + } + + /** + * Main constructor. Stores the headers for the individual bitmaps. + * + * @param colIndices + * indices (within the block) of the columns included in this + * column + * @param numRows + * total number of rows in the parent block + * @param ubm + * Uncompressed bitmap representation of the block + */ + public ColGroupValue(int[] colIndices, int numRows, UncompressedBitmap ubm) + { + super(colIndices, numRows); + + // sort values by frequency, if requested + if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH + && numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) { + ubm.sortValuesByFrequency(); + } + + // extract and store distinct values (bitmaps handled by subclasses) + _values = ubm.getValues(); + } + + /** + * Constructor for subclass methods that need to create shallow copies + * + * @param colIndices + * raw column index information + * @param numRows + * number of rows in the block + * @param values + * set of distinct values for the block (associated bitmaps are + * kept in the subclass) + */ + protected ColGroupValue(int[] colIndices, int numRows, double[] values) { + super(colIndices, numRows); + _values = values; + } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + + // adding the size of values + size += 8; //array reference + if (_values != null) { + size += 32 + _values.length * 8; //values + } + + return size; + } + + /** + * Obtain number of distrinct sets of values associated with the bitmaps in this column group. + * + * @return the number of distinct sets of values associated with the bitmaps + * in this column group + */ + public int getNumValues() { + return _values.length / _colIndexes.length; + } + + public double[] getValues() { + return _values; + } + + protected int containsAllZeroValue() { + int numVals = getNumValues(); + int numCols = getNumCols(); + for( int i=0, off=0; i<numVals; i++, off+=numCols ) { + boolean allZeros = true; + for( int j=0; j<numCols; j++ ) + allZeros &= (_values[off+j] == 0); + if( allZeros ) + return i; + } + return -1; + } + + protected final double sumValues(int valIx) { + final int numCols = getNumCols(); + final int valOff = valIx * numCols; + double val = 0.0; + for( int i = 0; i < numCols; i++ ) { + val += _values[valOff+i]; + } + + return val; + } + + protected final double sumValues(int valIx, KahanFunction kplus, KahanObject kbuff) { + final int numCols = getNumCols(); + final int valOff = valIx * numCols; + kbuff.set(0, 0); + for( int i = 0; i < numCols; i++ ) { + kplus.execute2(kbuff, _values[valOff+i]); + } + + return kbuff._sum; + } + + protected final double[] sumAllValues(KahanFunction kplus, KahanObject kbuff) { + //quick path: sum + if( getNumCols()==1 && kplus instanceof KahanPlus ) + return _values; //shallow copy of values + + //pre-aggregate value tuple + final int numVals = getNumValues(); + double[] ret = new double[numVals]; + for( int k=0; k<numVals; k++ ) + ret[k] = sumValues(k, kplus, kbuff); + + return ret; + } + + protected final double sumValues(int valIx, double[] b) { + final int numCols = getNumCols(); + final int valOff = valIx * numCols; + double val = 0; + for( int i = 0; i < numCols; i++ ) { + val += _values[valOff+i] * b[i]; + } + + return val; + } + + protected final double[] preaggValues(int numVals, double[] b) { + double[] ret = new double[numVals]; + for( int k = 0; k < numVals; k++ ) + ret[k] = sumValues(k, b); + + return ret; + } + + /** + * NOTE: Shared across OLE/RLE/DDC because value-only computation. + * + * @param result output matrix block + * @param builtin function object + * @param zeros indicator if column group contains zero values + */ + protected void computeMxx(MatrixBlock result, Builtin builtin, boolean zeros) + { + //init and 0-value handling + double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1); + if( zeros ) + val = builtin.execute2(val, 0); + + //iterate over all values only + final int numVals = getNumValues(); + final int numCols = getNumCols(); + for (int k = 0; k < numVals; k++) + for( int j=0, valOff = k*numCols; j<numCols; j++ ) + val = builtin.execute2(val, _values[ valOff+j ]); + + //compute new partial aggregate + val = builtin.execute2(val, result.quickGetValue(0, 0)); + result.quickSetValue(0, 0, val); + } + + /** + * NOTE: Shared across OLE/RLE/DDC because value-only computation. + * + * @param result output matrix block + * @param builtin function object + * @param zeros indicator if column group contains zero values + */ + protected void computeColMxx(MatrixBlock result, Builtin builtin, boolean zeros) + { + final int numVals = getNumValues(); + final int numCols = getNumCols(); + + //init and 0-value handling + double[] vals = new double[numCols]; + Arrays.fill(vals, Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1)); + if( zeros ) { + for( int j = 0; j < numCols; j++ ) + vals[j] = builtin.execute2(vals[j], 0); + } + + //iterate over all values only + for (int k = 0; k < numVals; k++) + for( int j=0, valOff=k*numCols; j<numCols; j++ ) + vals[j] = builtin.execute2(vals[j], _values[ valOff+j ]); + + //copy results to output + for( int j=0; j<numCols; j++ ) + result.quickSetValue(0, _colIndexes[j], vals[j]); + } + + /** + * Method for use by subclasses. Applies a scalar operation to the value + * metadata stored in the superclass. + * + * @param op + * scalar operation to perform + * @return transformed copy of value metadata for this column group + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + protected double[] applyScalarOp(ScalarOperator op) + throws DMLRuntimeException + { + //scan over linearized values + double[] ret = new double[_values.length]; + for (int i = 0; i < _values.length; i++) { + ret[i] = op.executeScalar(_values[i]); + } + + return ret; + } + + protected double[] applyScalarOp(ScalarOperator op, double newVal, int numCols) + throws DMLRuntimeException + { + //scan over linearized values + double[] ret = new double[_values.length + numCols]; + for( int i = 0; i < _values.length; i++ ) { + ret[i] = op.executeScalar(_values[i]); + } + + //add new value to the end + Arrays.fill(ret, _values.length, _values.length+numCols, newVal); + + return ret; + } + + @Override + public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) + throws DMLRuntimeException + { + unaryAggregateOperations(op, result, 0, getNumRows()); + } + + /** + * + * @param op aggregation operator + * @param result output matrix block + * @param rl row lower index, inclusive + * @param ru row upper index, exclusive + * @throws DMLRuntimeException + */ + public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException; +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 48ebcc5..84c4812 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -46,6 +46,7 @@ import org.apache.sysml.lops.MMTSJ.MMTSJType; import org.apache.sysml.lops.MapMultChain.ChainType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.ColGroup.CompressionType; +import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder; import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory; @@ -56,12 +57,14 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; +import org.apache.sysml.runtime.functionobjects.KahanFunction; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.functionobjects.KahanPlusSq; import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.functionobjects.ReduceAll; import org.apache.sysml.runtime.functionobjects.ReduceCol; import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; +import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.matrix.data.CTableMap; import org.apache.sysml.runtime.matrix.data.LibMatrixBincell; @@ -98,7 +101,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable public static final boolean MATERIALIZE_ZEROS = false; public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB public static final boolean INVESTIGATE_ESTIMATES = false; - private static final boolean LDEBUG = false; //local debug flag + public static boolean ALLOW_DDC_ENCODING = true; + private static final boolean LDEBUG = true; //local debug flag + private static final Level LDEBUG_LEVEL = Level.DEBUG; //DEBUG/TRACE for details private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); @@ -106,7 +111,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // for internal debugging only if( LDEBUG ) { Logger.getLogger("org.apache.sysml.runtime.compress") - .setLevel((Level) Level.DEBUG); + .setLevel((Level) LDEBUG_LEVEL); } } @@ -231,7 +236,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable final int numRows = getNumRows(); final int numCols = getNumColumns(); final boolean sparse = isInSparseFormat(); - final double sp = OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()); MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) : LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k); @@ -239,45 +243,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable CompressedSizeEstimator bitmapSizeEstimator = SizeEstimatorFactory.getSizeEstimator(rawblock, numRows); - // The current implementation of this method is written for correctness, - // not for performance or for minimal use of temporary space. - - // We start with a full set of columns. - HashSet<Integer> remainingCols = new HashSet<Integer>(); - for (int i = 0; i < numCols; i++) - remainingCols.add(i); - // PHASE 1: Classify columns by compression type - // We start by determining which columns are amenable to bitmap compression - double uncompressedColumnSize = getUncompressedSize(numRows, 1, sp); - - // information about the bitmap amenable columns - List<Integer> bitmapCols = new ArrayList<Integer>(); - List<Integer> uncompressedCols = new ArrayList<Integer>(); - List<Integer> colsCards = new ArrayList<Integer>(); - List<Long> compressedSizes = new ArrayList<Long>(); - HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>(); + // We start by determining which columns are amenable to compression + List<Integer> colsC = new ArrayList<Integer>(); + List<Integer> colsUC = new ArrayList<Integer>(); + HashMap<Integer, Double> compRatios = new HashMap<Integer, Double>(); - // Classify columns according to ration (size uncompressed / size compressed), + // Classify columns according to ratio (size uncompressed / size compressed), // where a column is compressible if ratio > 1. CompressedSizeInfo[] sizeInfos = (k > 1) ? computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : - computeCompressedSizeInfos(bitmapSizeEstimator, numCols); + computeCompressedSizeInfos(bitmapSizeEstimator, numCols); + long nnzUC = 0; for (int col = 0; col < numCols; col++) { - long compressedSize = sizeInfos[col].getMinSize(); - double compRatio = uncompressedColumnSize / compressedSize; - if (compRatio > 1) { - bitmapCols.add(col); - compressionRatios.put(col, compRatio); - colsCards.add(sizeInfos[col].getEstCarinality()); - compressedSizes.add(compressedSize); + double uncompSize = getUncompressedSize(numRows, 1, + OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz())); + double compRatio = uncompSize / sizeInfos[col].getMinSize(); + if( compRatio > 1 ) { + colsC.add(col); + compRatios.put(col, compRatio); + } + else { + colsUC.add(col); + nnzUC += sizeInfos[col].getEstNnz(); } - else - uncompressedCols.add(col); } - - _stats.timePhase1 = time.stop(); + + // correction of column classification (reevaluate dense estimates if necessary) + boolean sparseUC = MatrixBlock.evalSparseFormatInMemory(numRows, colsUC.size(), nnzUC); + if( !sparseUC && !colsUC.isEmpty() ) { + for( int i=0; i<colsUC.size(); i++ ) { + int col = colsUC.get(i); + double uncompSize = getUncompressedSize(numRows, 1, 1.0); + double compRatio = uncompSize / sizeInfos[col].getMinSize(); + if( compRatio > 1 ) { + colsC.add(col); + colsUC.remove(i); i--; + compRatios.put(col, compRatio); + nnzUC -= sizeInfos[col].getEstNnz(); + } + } + } + if( LOG.isDebugEnabled() ) { + _stats.timePhase1 = time.stop(); LOG.debug("Compression statistics:"); LOG.debug("--compression phase 1: "+_stats.timePhase1); } @@ -285,26 +294,28 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // PHASE 2: Grouping columns // Divide the bitmap columns into column groups. List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( - bitmapSizeEstimator, bitmapCols, colsCards, compressedSizes, numRows, - isInSparseFormat() ? sp : 1, k); + bitmapSizeEstimator, colsC, sizeInfos, numRows, k); - _stats.timePhase2 = time.stop(); - if( LOG.isDebugEnabled() ) + if( LOG.isDebugEnabled() ) { + _stats.timePhase2 = time.stop(); LOG.debug("--compression phase 2: "+_stats.timePhase2); - + } + if( INVESTIGATE_ESTIMATES ) { double est = 0; for( int[] groupIndices : bitmapColGrps ) est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize(); - est += uncompressedCols.size() * uncompressedColumnSize; + est += MatrixBlock.estimateSizeInMemory(numRows, colsUC.size(), + OptimizerUtils.getSparsity(numRows, colsUC.size(), nnzUC)); _stats.estSize = est; } // PHASE 3: Compress and correct sample-based decisions ColGroup[] colGroups = (k > 1) ? - compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps, k) : - compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps); + compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : + compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); allocateColGroupList(); + HashSet<Integer> remainingCols = seq(0, numCols-1, 1); for( int j=0; j<colGroups.length; j++ ) { if( colGroups[j] != null ) { for( int col : colGroups[j].getColIndices() ) @@ -313,10 +324,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } } - _stats.timePhase3 = time.stop(); - if( LOG.isDebugEnabled() ) + if( LOG.isDebugEnabled() ) { + _stats.timePhase3 = time.stop(); LOG.debug("--compression phase 3: "+_stats.timePhase3); - + } + // Phase 4: Cleanup // The remaining columns are stored uncompressed as one big column group if( !remainingCols.isEmpty() ) { @@ -332,10 +344,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable rawblock.cleanupBlock(true, true); this.cleanupBlock(true, true); - _stats.timePhase4 = time.stop(); if( LOG.isDebugEnabled() ) { + _stats.timePhase4 = time.stop(); + int[] counts = getColGroupCounts(_colGroups); LOG.debug("--compression phase 4: "+_stats.timePhase4); LOG.debug("--num col groups: "+_colGroups.size()); + LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): " + +counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]); + LOG.debug("--col groups sizes (OLE,RLE,DDC1,DDC2,UC): " + +counts[7]+","+counts[6]+","+counts[8]+","+counts[9]+","+counts[5]); LOG.debug("--compressed size: "+_stats.size); LOG.debug("--compression ratio: "+_stats.ratio); } @@ -345,6 +362,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable return _stats; } + /** + * Get array of counts regarding col group types. The position + * corresponds with the enum ordinal. + * + * @param colgroups list of column groups + * @return counts + */ + private static int[] getColGroupCounts(ArrayList<ColGroup> colgroups) { + int[] ret = new int[10]; //5 x count, 5 x num_columns + for( ColGroup c : colgroups ) { + ret[c.getCompType().ordinal()] ++; + ret[5+c.getCompType().ordinal()] += c.getNumCols(); + } + return ret; + } + private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) { CompressedSizeInfo[] ret = new CompressedSizeInfo[clen]; for( int col=0; col<clen; col++ ) @@ -372,23 +405,23 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } } - private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups) + private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst) { ColGroup[] ret = new ColGroup[groups.size()]; for( int i=0; i<groups.size(); i++ ) - ret[i] = compressColGroup(in, estim, compRatios, rlen, sp, groups.get(i)); + ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i), denseEst); return ret; } - private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups, int k) + private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst, int k) throws DMLRuntimeException { try { ExecutorService pool = Executors.newFixedThreadPool( k ); ArrayList<CompressTask> tasks = new ArrayList<CompressTask>(); for( int[] colIndexes : groups ) - tasks.add(new CompressTask(in, estim, compRatios, rlen, sp, colIndexes)); + tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes, denseEst)); List<Future<ColGroup>> rtask = pool.invokeAll(tasks); ArrayList<ColGroup> ret = new ArrayList<ColGroup>(); for( Future<ColGroup> lrtask : rtask ) @@ -401,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } } - private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes) + private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst) { int[] allGroupIndices = null; int allColsCount = colIndexes.length; @@ -416,12 +449,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //exact big list and observe compression ratio ubm = BitmapEncoder.extractBitmap(colIndexes, in); sizeInfo = estim.estimateCompressedColGroupSize(ubm); - double compRatio = getUncompressedSize(rlen, colIndexes.length, sp) / sizeInfo.getMinSize(); - + double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets()); + double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize(); + if( compRatio > 1 ) { break; // we have a good group } - + // modify the group if (compRatioPQ == null) { // first modification @@ -454,9 +488,17 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //create compressed column group long rleSize = sizeInfo.getRLESize(); long oleSize = sizeInfo.getOLESize(); - if( rleSize < oleSize ) + long ddcSize = sizeInfo.getDDCSize(); + + if( ALLOW_DDC_ENCODING && ddcSize < rleSize && ddcSize < oleSize ) { + if( ubm.getNumValues()<=255 ) + return new ColGroupDDC1(colIndexes, rlen, ubm); + else + return new ColGroupDDC2(colIndexes, rlen, ubm); + } + else if( rleSize < oleSize ) return new ColGroupRLE(colIndexes, rlen, ubm); - else + else return new ColGroupOLE(colIndexes, rlen, ubm); } @@ -469,10 +511,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * @return estimate of uncompressed size of column group */ private static double getUncompressedSize(int rlen, int clen, double sparsity) { - //we estimate the uncompressed size as 8 * nnz in order to cover both - //sparse and dense with moderate underestimation (which is conservative as - //it is biased towards uncompressed columns) - return 8 * rlen * clen * sparsity; + //we estimate the uncompressed size as the minimum of dense representation + //and representation in csr, which moderately overestimates sparse representations + //of single columns but helps avoid anomalies with sparse columns that are + //eventually represented in dense + return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity); } /** @@ -587,8 +630,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } private static class CompressedColumn implements Comparable<CompressedColumn> { - int colIx; - double compRatio; + final int colIx; + final double compRatio; public CompressedColumn(int colIx, double compRatio) { this.colIx = colIx; @@ -613,6 +656,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable public CompressionStatistics() { //do nothing } + + public CompressionStatistics(double t1, double t2, double t3, double t4){ + timePhase1 = t1; + timePhase2 = t2; + timePhase3 = t3; + timePhase4 = t4; + } } @Override @@ -681,6 +731,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable grp = new ColGroupOLE(); break; case RLE_BITMAP: grp = new ColGroupRLE(); break; + case DDC1: + grp = new ColGroupDDC1(); break; + case DDC2: + grp = new ColGroupDDC2(); break; } //deserialize and add column group @@ -1040,11 +1094,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //aggregate partial results if( op.indexFn instanceof ReduceAll ) { - double val = ret.quickGetValue(0, 0); - for( Future<MatrixBlock> rtask : rtasks ) - val = op.aggOp.increOp.fn.execute(val, - rtask.get().quickGetValue(0, 0)); - ret.quickSetValue(0, 0, val); + if( op.aggOp.increOp.fn instanceof KahanFunction ) { + KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0); + for( Future<MatrixBlock> rtask : rtasks ) { + double tmp = rtask.get().quickGetValue(0, 0); + ((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp); + } + ret.quickSetValue(0, 0, kbuff._sum); + } + else { + double val = ret.quickGetValue(0, 0); + for( Future<MatrixBlock> rtask : rtasks ) { + double tmp = rtask.get().quickGetValue(0, 0); + val = op.aggOp.increOp.fn.execute(val, tmp); + } + ret.quickSetValue(0, 0, val); + } } } catch(Exception ex) { @@ -1058,9 +1123,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable grp.unaryAggregateOperations(op, ret); //process OLE/RLE column groups - for (ColGroup grp : _colGroups) - if( !(grp instanceof ColGroupUncompressed) ) - grp.unaryAggregateOperations(op, ret); + aggregateUnaryOperations(op, _colGroups, ret, 0, rlen); } //special handling zeros for rowmins/rowmax @@ -1089,6 +1152,41 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } @Override + public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, + MatrixValue result, int blockingFactorRow, int blockingFactorCol, + MatrixIndexes indexesIn) throws DMLRuntimeException { + return aggregateUnaryOperations(op, result, + blockingFactorRow, blockingFactorCol, indexesIn, false); + } + + private static void aggregateUnaryOperations(AggregateUnaryOperator op, + ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru) throws DMLRuntimeException + { + boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT + && op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus //rowSums + && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS + && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ/2; + + //process cache-conscious DDC1 groups (adds to output) + if( cacheDDC1 ) { + ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>(); + for( ColGroup grp : groups ) + if( grp instanceof ColGroupDDC1 ) + tmp.add((ColGroupDDC1)grp); + if( !tmp.isEmpty() ) + ColGroupDDC1.computeRowSums(tmp.toArray(new ColGroupDDC1[0]), ret, + KahanPlus.getKahanPlusFnObject(), rl, ru); + } + + //process remaining groups (adds to output) + //note: UC group never passed into this function + for( ColGroup grp : groups ) + if( !(grp instanceof ColGroupUncompressed) + && !(cacheDDC1 && grp instanceof ColGroupDDC1) ) + ((ColGroupValue)grp).unaryAggregateOperations(op, ret, rl, ru); + } + + @Override public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) throws DMLRuntimeException { @@ -1204,12 +1302,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable result.allocateDenseBlock(); // delegate matrix-vector operation to each column group - for( ColGroup grp : _colGroups ) - if( grp instanceof ColGroupUncompressed ) //overwrites output - grp.rightMultByVector(vector, result, 0, result.getNumRows()); - for( ColGroup grp : _colGroups ) - if( !(grp instanceof ColGroupUncompressed) ) //adds to output - grp.rightMultByVector(vector, result, 0, result.getNumRows()); + rightMultByVector(_colGroups, vector, result, true, 0, result.getNumRows()); // post-processing result.recomputeNonZeros(); @@ -1231,6 +1324,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //multi-threaded execution of all groups try { + ColGroupUncompressed uc = getUncompressedColGroup(); + + //compute uncompressed column group in parallel + if( uc != null ) + uc.rightMultByVector(vector, result, k); + + //compute remaining compressed column groups in parallel ExecutorService pool = Executors.newFixedThreadPool( k ); int rlen = getNumRows(); int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ; @@ -1239,15 +1339,48 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>(); for( int i=0; i<k & i*blklen<getNumRows(); i++ ) tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen))); - pool.invokeAll(tasks); + List<Future<Long>> ret = pool.invokeAll(tasks); pool.shutdown(); + + //error handling and nnz aggregation + long lnnz = 0; + for( Future<Long> tmp : ret ) + lnnz += tmp.get(); + result.setNonZeros(lnnz); } catch(Exception ex) { throw new DMLRuntimeException(ex); } + } + + private static void rightMultByVector(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) + throws DMLRuntimeException + { + boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT + && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ; - // post-processing - result.recomputeNonZeros(); + // process uncompressed column group (overwrites output) + if( inclUC ) { + for( ColGroup grp : groups ) + if( grp instanceof ColGroupUncompressed ) + grp.rightMultByVector(vect, ret, rl, ru); + } + + //process cache-conscious DDC1 groups (adds to output) + if( cacheDDC1 ) { + ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>(); + for( ColGroup grp : groups ) + if( grp instanceof ColGroupDDC1 ) + tmp.add((ColGroupDDC1)grp); + if( !tmp.isEmpty() ) + ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru); + } + + //process remaining groups (adds to output) + for( ColGroup grp : groups ) + if( !(grp instanceof ColGroupUncompressed) + && !(cacheDDC1 && grp instanceof ColGroupDDC1) ) + grp.rightMultByVector(vect, ret, rl, ru); } /** @@ -1299,11 +1432,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * @param k number of threads * @throws DMLRuntimeException if DMLRuntimeException occurs */ - private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) + private void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) throws DMLRuntimeException { - int kuc = Math.max(1, k - colGroups.size() + 1); - //transpose vector if required MatrixBlock rowVector = vector; if (doTranspose) { @@ -1317,12 +1448,21 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //multi-threaded execution try { - ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) ); + //compute uncompressed column group in parallel + ColGroupUncompressed uc = getUncompressedColGroup(); + if( uc != null ) + uc.leftMultByRowVector(vector, result, k); + + //compute remaining compressed column groups in parallel + ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>(); for( ColGroup grp : colGroups ) - tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc)); - pool.invokeAll(tasks); + if( !(grp instanceof ColGroupUncompressed) ) + tasks.add(new LeftMatrixMultTask(grp, rowVector, result)); + List<Future<Object>> ret = pool.invokeAll(tasks); pool.shutdown(); + for( Future<Object> tmp : ret ) + tmp.get(); //error handling } catch(Exception ex) { throw new DMLRuntimeException(ex); @@ -1405,37 +1545,32 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static class LeftMatrixMultTask implements Callable<Object> { - private ColGroup _group = null; - private MatrixBlock _vect = null; - private MatrixBlock _ret = null; - private int _kuc = 1; + private final ColGroup _group; + private final MatrixBlock _vect; + private final MatrixBlock _ret; - protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) { + protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret) { _group = group; _vect = vect; _ret = ret; - _kuc = kuc; } @Override public Object call() throws DMLRuntimeException { // delegate matrix-vector operation to each column group - if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT ) - ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc); - else - _group.leftMultByRowVector(_vect, _ret); + _group.leftMultByRowVector(_vect, _ret); return null; } } - private static class RightMatrixMultTask implements Callable<Object> + private static class RightMatrixMultTask implements Callable<Long> { - private ArrayList<ColGroup> _groups = null; - private MatrixBlock _vect = null; - private MatrixBlock _ret = null; - private int _rl = -1; - private int _ru = -1; + private final ArrayList<ColGroup> _groups; + private final MatrixBlock _vect; + private final MatrixBlock _ret; + private final int _rl; + private final int _ru; protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) { _groups = groups; @@ -1446,25 +1581,18 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } @Override - public Object call() throws DMLRuntimeException - { - // delegate vector-matrix operation to each column group - for( ColGroup grp : _groups ) - if( grp instanceof ColGroupUncompressed ) //overwrites output - grp.rightMultByVector(_vect, _ret, _rl, _ru); - for( ColGroup grp : _groups ) - if( !(grp instanceof ColGroupUncompressed) ) //adds to output - grp.rightMultByVector(_vect, _ret, _rl, _ru); - return null; + public Long call() throws DMLRuntimeException { + rightMultByVector(_groups, _vect, _ret, false, _rl, _ru); + return _ret.recomputeNonZeros(_rl, _ru-1, 0, 0); } } private static class MatrixMultTransposeTask implements Callable<Object> { - private ArrayList<ColGroup> _groups = null; - private MatrixBlock _ret = null; - private int _gl = -1; - private int _gu = -1; + private final ArrayList<ColGroup> _groups; + private final MatrixBlock _ret; + private final int _gl; + private final int _gu; protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int gl, int gu) { _groups = groups; @@ -1482,11 +1610,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static class UnaryAggregateTask implements Callable<MatrixBlock> { - private ArrayList<ColGroup> _groups = null; - private int _rl = -1; - private int _ru = -1; - private MatrixBlock _ret = null; - private AggregateUnaryOperator _op = null; + private final ArrayList<ColGroup> _groups; + private final int _rl; + private final int _ru; + private final MatrixBlock _ret; + private final AggregateUnaryOperator _op; protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op) { _groups = groups; @@ -1507,18 +1635,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable @Override public MatrixBlock call() throws DMLRuntimeException { - // delegate unary aggregate operation to each column group - // (uncompressed column group handles separately) - for( ColGroup grp : _groups ) - ((ColGroupBitmap)grp).unaryAggregateOperations(_op, _ret, _rl, _ru); + aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru); return _ret; } } private static class SizeEstimTask implements Callable<CompressedSizeInfo> { - private CompressedSizeEstimator _estim = null; - private int _col = -1; + private final CompressedSizeEstimator _estim; + private final int _col; protected SizeEstimTask( CompressedSizeEstimator estim, int col ) { _estim = estim; @@ -1533,34 +1658,34 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static class CompressTask implements Callable<ColGroup> { - private MatrixBlock _in = null; - private CompressedSizeEstimator _estim = null; - private HashMap<Integer, Double> _compRatios = null; - private int _rlen = -1; - private double _sp = -1; - private int[] _colIndexes = null; - - protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes ) { + private final MatrixBlock _in; + private final CompressedSizeEstimator _estim; + private final HashMap<Integer, Double> _compRatios; + private final int _rlen; + private final int[] _colIndexes; + private final boolean _denseEst; + + protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst ) { _in = in; _estim = estim; _compRatios = compRatios; _rlen = rlen; - _sp = sp; _colIndexes = colIndexes; + _denseEst = denseEst; } @Override public ColGroup call() throws DMLRuntimeException { - return compressColGroup(_in, _estim, _compRatios, _rlen, _sp, _colIndexes); + return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes, _denseEst); } } private static class DecompressTask implements Callable<Object> { - private List<ColGroup> _colGroups = null; - private MatrixBlock _ret = null; - private int _rl = -1; - private int _ru = -1; + private final List<ColGroup> _colGroups; + private final MatrixBlock _ret; + private final int _rl; + private final int _ru; protected DecompressTask( List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru ) { _colGroups = colGroups; @@ -1735,15 +1860,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable MatrixBlock tmp = isCompressed() ? decompress() : this; return tmp.zeroOutOperations(result, range, complementary); } - - @Override - public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, - MatrixValue result, int blockingFactorRow, int blockingFactorCol, - MatrixIndexes indexesIn) throws DMLRuntimeException { - printDecompressWarning("aggregateUnaryOperations"); - MatrixBlock tmp = isCompressed() ? decompress() : this; - return tmp.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn); - } @Override public CM_COV_Object cmOperations(CMOperator op) throws DMLRuntimeException { @@ -2000,4 +2116,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations."); } } + + private HashSet<Integer> seq(int from, int to, int incr) { + HashSet<Integer> ret = new HashSet<Integer>(); + for (int i = from; i <= to; i+=incr) + ret.add(i); + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java deleted file mode 100644 index 70308bb..0000000 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.compress; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -/** - * Used for the finding columns to co-code - * - */ -public class PlanningBinPacker -{ - private final float _binWeight; - private final List<Integer> _items; - private final List<Float> _itemWeights; - - public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) { - _binWeight = binWeight; - _items = items; - _itemWeights = itemWeights; - } - - /** - * NOTE: upper bound is 17/10 OPT - * - * @return key: available space, value: list of the bins that have that free space - */ - public TreeMap<Float, List<List<Integer>>> packFirstFit() { - return packFirstFit(_items, _itemWeights); - } - - private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights) - { - // when searching for a bin, the first bin in the list is used - TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>(); - // first bin - bins.put(_binWeight, createBinList()); - int numItems = items.size(); - for (int i = 0; i < numItems; i++) { - float itemWeight = itemWeights.get(i); - Map.Entry<Float, List<List<Integer>>> entry = bins - .ceilingEntry(itemWeight); - if (entry == null) { - // new bin - float newBinWeight = _binWeight - itemWeight; - List<List<Integer>> binList = bins.get(newBinWeight); - if (binList == null) { - bins.put(newBinWeight, createBinList(items.get(i))); - } else { - List<Integer> newBin = new ArrayList<Integer>(); - newBin.add(items.get(i)); - binList.add(newBin); - } - } else { - // add to the first bin in the list - List<Integer> assignedBin = entry.getValue().remove(0); - assignedBin.add(items.get(i)); - if (entry.getValue().size() == 0) - bins.remove(entry.getKey()); - float newBinWeight = entry.getKey() - itemWeight; - List<List<Integer>> newBinsList = bins.get(newBinWeight); - if (newBinsList == null) { - // new bin - bins.put(newBinWeight, createBinList(assignedBin)); - } else { - newBinsList.add(assignedBin); - } - } - } - return bins; - } - - private List<List<Integer>> createBinList() { - List<List<Integer>> binList = new ArrayList<List<Integer>>(); - binList.add(new ArrayList<Integer>()); - return binList; - } - - private List<List<Integer>> createBinList(int item) { - List<List<Integer>> binList = new ArrayList<List<Integer>>(); - List<Integer> bin = new ArrayList<Integer>(); - binList.add(bin); - bin.add(item); - return binList; - } - - private List<List<Integer>> createBinList(List<Integer> bin) { - List<List<Integer>> binList = new ArrayList<List<Integer>>(); - binList.add(bin); - return binList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java deleted file mode 100644 index 9313cd9..0000000 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.compress; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.PriorityQueue; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; - -public class PlanningCoCoder -{ - //constants for weight computation - private final static float GROUPABILITY_THRESHOLD = 0.00064f; - private final static float PARTITION_WEIGHT = 0.05F; //higher values lead to more grouping - private final static float PARTITION_SIZE = PARTITION_WEIGHT * GROUPABILITY_THRESHOLD; - - public static List<int[]> findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> availCols, - List<Integer> colsCardinalities, List<Long> compressedSize, int numRows, double sparsity, int k) - throws DMLRuntimeException - { - List<int[]> retGroups = new ArrayList<int[]>(); - - // filtering out non-groupable columns as singleton groups - // weighted of each column is the ratio of its cardinality to the number - // of rows scaled by the matrix sparsity - int numCols = availCols.size(); - List<Integer> groupCols = new ArrayList<Integer>(); - List<Float> groupColWeights = new ArrayList<Float>(); - HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<Integer, GroupableColInfo>(); - for (int i = 0; i < numCols; i++) { - int colIx = availCols.get(i); - int cardinality = colsCardinalities.get(i); - float weight = ((float) cardinality) / numRows; - if (weight <= GROUPABILITY_THRESHOLD) { - groupCols.add(colIx); - groupColWeights.add(weight); - groupColsInfo.put(colIx, new GroupableColInfo(weight,compressedSize.get(i))); - } else { - retGroups.add(new int[] { colIx }); - } - } - - // bin packing based on PARTITION_WEIGHT and column weights - float weight = computeWeightForCoCoding(numRows, sparsity); - TreeMap<Float, List<List<Integer>>> bins = new PlanningBinPacker( - weight, groupCols, groupColWeights).packFirstFit(); - - // brute force grouping within each partition - retGroups.addAll( (k > 1) ? - getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows, k) : - getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows)); - - return retGroups; - } - - private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen) - { - List<int[]> retGroups = new ArrayList<int[]>(); - for (List<List<Integer>> binList : bins.values()) { - for (List<Integer> bin : binList) { - // building an array of singleton CoCodingGroup - ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>(); - for (Integer col : bin) - sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col))); - // brute force co-coding - PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce( - estim, rlen, sgroups.toArray(new PlanningCoCodingGroup[0])); - for (PlanningCoCodingGroup grp : outputGroups) - retGroups.add(grp.getColIndices()); - } - } - - return retGroups; - } - - private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int k) - throws DMLRuntimeException - { - List<int[]> retGroups = new ArrayList<int[]>(); - try { - ExecutorService pool = Executors.newFixedThreadPool( k ); - ArrayList<CocodeTask> tasks = new ArrayList<CocodeTask>(); - for (List<List<Integer>> binList : bins.values()) - for (List<Integer> bin : binList) { - // building an array of singleton CoCodingGroup - ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>(); - for (Integer col : bin) - sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col))); - tasks.add(new CocodeTask(estim, sgroups, rlen)); - } - List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks); - for( Future<PlanningCoCodingGroup[]> lrtask : rtask ) - for (PlanningCoCodingGroup grp : lrtask.get()) - retGroups.add(grp.getColIndices()); - pool.shutdown(); - } - catch(Exception ex) { - throw new DMLRuntimeException(ex); - } - - return retGroups; - } - - /** - * Identify columns to code together. Uses a greedy approach that merges - * pairs of column groups into larger groups. Each phase of the greedy - * algorithm considers all combinations of pairs to merge. - * - * @param sizeEstimator compressed size estimator - * @param numRowsWeight number of rows weight - * @param singltonGroups planning co-coding groups - * @return - */ - private static PlanningCoCodingGroup[] findCocodesBruteForce( - CompressedSizeEstimator sizeEstimator, float numRowsWeight, - PlanningCoCodingGroup[] singltonGroups) - { - // Populate a priority queue with all available 2-column cocodings. - PriorityQueue<PlanningGroupMergeAction> q = new PriorityQueue<PlanningGroupMergeAction>(); - for (int leftIx = 0; leftIx < singltonGroups.length; leftIx++) { - PlanningCoCodingGroup leftGrp = singltonGroups[leftIx]; - for (int rightIx = leftIx + 1; rightIx < singltonGroups.length; rightIx++) { - PlanningCoCodingGroup rightGrp = singltonGroups[rightIx]; - // at least one of the two groups should be low-cardinality - float cardRatio = leftGrp.getCardinalityRatio() + rightGrp.getCardinalityRatio(); - if ( cardRatio < GROUPABILITY_THRESHOLD) { - PlanningGroupMergeAction potentialMerge = new PlanningGroupMergeAction( - sizeEstimator, numRowsWeight, leftGrp, rightGrp); - if (potentialMerge.getChangeInSize() < 0) { - q.add(potentialMerge); - } - } - } - } - PlanningCoCodingGroup[] colGroups = singltonGroups; - - // Greedily merge groups until we can no longer reduce the number of - // runs by merging groups - while (q.size() > 0) { - PlanningGroupMergeAction merge = q.poll(); - - // The queue can contain merge actions involving column groups that - // have already been merged. - // Filter those actions out. - int leftIx = findInArray(colGroups, merge.getLeftGrp()); - int rightIx = findInArray(colGroups, merge.getRightGrp()); - if (leftIx < 0 || rightIx < 0) { - // One or more of the groups to be merged has already been made - // part of another group. - // Drop the merge action. - } else { - PlanningCoCodingGroup mergedGrp = merge.getMergedGrp(); - - PlanningCoCodingGroup[] newColGroups = new PlanningCoCodingGroup[colGroups.length - 1]; - int targetIx = 0; - for (int i = 0; i < colGroups.length; i++) { - if (i != leftIx && i != rightIx) { - newColGroups[targetIx] = colGroups[i]; - targetIx++; - } - } - - // New group goes at the end to (hopefully) speed up future - // linear search operations - newColGroups[newColGroups.length - 1] = mergedGrp; - - // Consider merging the new group with all the other - // pre-existing groups. - for (int i = 0; i < newColGroups.length - 1; i++) { - PlanningCoCodingGroup newLeftGrp = newColGroups[i]; - PlanningCoCodingGroup newRightGrp = mergedGrp; - if (newLeftGrp.getCardinalityRatio() - + newRightGrp.getCardinalityRatio() < GROUPABILITY_THRESHOLD) { - PlanningGroupMergeAction newPotentialMerge = new PlanningGroupMergeAction( - sizeEstimator, numRowsWeight, newLeftGrp, - newRightGrp); - if (newPotentialMerge.getChangeInSize() < 0) { - q.add(newPotentialMerge); - } - } - } - colGroups = newColGroups; - } - } - return colGroups; - } - - private static float computeWeightForCoCoding(int numRows, double sparsity) { - //we use a constant partition size (independent of the number of rows - //in order to ensure constant compression speed independent of blocking) - return PARTITION_SIZE; - } - - private static int findInArray(Object[] arr, Object val) { - for (int i = 0; i < arr.length; i++) { - if (arr[i].equals(val)) { - return i; - } - } - return -1; - } - - protected static class GroupableColInfo { - float cardRatio; - long size; - - public GroupableColInfo(float lcardRatio, long lsize) { - cardRatio = lcardRatio; - size = lsize; - } - } - - private static class CocodeTask implements Callable<PlanningCoCodingGroup[]> - { - private CompressedSizeEstimator _estim = null; - private ArrayList<PlanningCoCodingGroup> _sgroups = null; - private int _rlen = -1; - - protected CocodeTask( CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen ) { - _estim = estim; - _sgroups = sgroups; - _rlen = rlen; - } - - @Override - public PlanningCoCodingGroup[] call() throws DMLRuntimeException { - // brute force co-coding - return findCocodesBruteForce(_estim, _rlen, - _sgroups.toArray(new PlanningCoCodingGroup[0])); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java deleted file mode 100644 index 9ee0d7e..0000000 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.compress; - -import java.util.Arrays; - -import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo; -import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; -import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; - -/** - * Class to represent information about co-coding a group of columns. - * - */ -public class PlanningCoCodingGroup -{ - private int[] _colIndexes; - private long _estSize; - private float _cardRatio; - - /** - * Constructor for a one-column group; i.e. do not co-code a given column. - * - * @param col column - * @param info groupable column info - */ - public PlanningCoCodingGroup(int col, GroupableColInfo info) { - _colIndexes = new int[]{col}; - _estSize = info.size; - _cardRatio = info.cardRatio; - } - - /** - * Constructor for merging two disjoint groups of columns - * - * @param grp1 first group of columns to merge - * @param grp2 second group to merge - * @param bitmapSizeEstimator bitmap size estimator - * @param numRowsWeight numRows x sparsity - */ - public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, PlanningCoCodingGroup grp2, - CompressedSizeEstimator bitmapSizeEstimator, float numRowsWeight) - { - // merge sorted non-empty arrays - _colIndexes = new int[grp1._colIndexes.length + grp2._colIndexes.length]; - int grp1Ptr = 0, grp2Ptr = 0; - for (int mergedIx = 0; mergedIx < _colIndexes.length; mergedIx++) { - if (grp1._colIndexes[grp1Ptr] < grp2._colIndexes[grp2Ptr]) { - _colIndexes[mergedIx] = grp1._colIndexes[grp1Ptr++]; - if (grp1Ptr == grp1._colIndexes.length) { - System.arraycopy(grp2._colIndexes, grp2Ptr, _colIndexes, - mergedIx + 1, grp2._colIndexes.length - grp2Ptr); - break; - } - } else { - _colIndexes[mergedIx] = grp2._colIndexes[grp2Ptr++]; - if (grp2Ptr == grp2._colIndexes.length) { - System.arraycopy(grp1._colIndexes, grp1Ptr, _colIndexes, - mergedIx + 1, grp1._colIndexes.length - grp1Ptr); - break; - } - } - } - - // estimating size info - CompressedSizeInfo groupSizeInfo = bitmapSizeEstimator - .estimateCompressedColGroupSize(_colIndexes); - _estSize = groupSizeInfo.getMinSize(); - _cardRatio = groupSizeInfo.getEstCarinality() / numRowsWeight; - } - - public int[] getColIndices() { - return _colIndexes; - } - - /** - * Obtain estimated compressed size of the grouped columns. - * - * @return estimated compressed size of the grouped columns - */ - public long getEstSize() { - return _estSize; - } - - public float getCardinalityRatio() { - return _cardRatio; - } - - @Override - public String toString() { - return Arrays.toString(_colIndexes); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java deleted file mode 100644 index 47d46d5..0000000 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.compress; - -import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; - -/** - * Internal data structure for tracking potential merges of column groups in - * co-coding calculations. - * - */ -class PlanningGroupMergeAction implements Comparable<PlanningGroupMergeAction> -{ - private PlanningCoCodingGroup _leftGrp; //left input - private PlanningCoCodingGroup _rightGrp; //right input - private PlanningCoCodingGroup _mergedGrp; //output - private long _changeInSize; - - - public PlanningGroupMergeAction(CompressedSizeEstimator sizeEstimator, - float numRowsWeight, PlanningCoCodingGroup leftGrp, PlanningCoCodingGroup rightGrp) { - _leftGrp = leftGrp; - _rightGrp = rightGrp; - _mergedGrp = new PlanningCoCodingGroup(leftGrp, rightGrp, sizeEstimator, numRowsWeight); - - // Negative size change ==> Decrease in size - _changeInSize = _mergedGrp.getEstSize() - - leftGrp.getEstSize() - rightGrp.getEstSize(); - } - - public int compareTo(PlanningGroupMergeAction o) { - // We only sort by the change in size - return (int) Math.signum(_changeInSize - o._changeInSize); - } - - @Override - public String toString() { - return String.format("Merge %s and %s", _leftGrp, _rightGrp); - } - - public PlanningCoCodingGroup getLeftGrp() { - return _leftGrp; - } - - public PlanningCoCodingGroup getRightGrp() { - return _rightGrp; - } - - public PlanningCoCodingGroup getMergedGrp() { - return _mergedGrp; - } - - public long getChangeInSize() { - return _changeInSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java index 63c0467..60d0532 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java @@ -63,7 +63,6 @@ public class ReaderColumnSelectionSparse extends ReaderColumnSelection if( data.getSparseBlock()!=null ) for( int i=0; i<colIndexes.length; i++ ) sparseCols[i] = data.getSparseBlock().get(colIndexes[i]); - Arrays.fill(sparsePos, 0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java index d62bae9..2f68edf 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java +++ b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java @@ -21,10 +21,13 @@ package org.apache.sysml.runtime.compress; import java.util.Arrays; +import org.apache.commons.lang.ArrayUtils; import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap; import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap; import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap.DArrayIListEntry; import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry; +import org.apache.sysml.runtime.compress.utils.IntArrayList; +import org.apache.sysml.runtime.util.SortUtils; /** * Uncompressed representation of one or more columns in bitmap format. @@ -32,13 +35,13 @@ import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry; */ public final class UncompressedBitmap { - private int _numCols; + private final int _numCols; /** Distinct values that appear in the column. Linearized as value groups <v11 v12> <v21 v22>.*/ private double[] _values; /** Bitmaps (as lists of offsets) for each of the values. */ - private int[][] _offsetsLists; + private IntArrayList[] _offsetsLists; public UncompressedBitmap( DblArrayIntListHashMap distinctVals, int numColumns ) { @@ -46,11 +49,11 @@ public final class UncompressedBitmap // Convert inputs to arrays int numVals = distinctVals.size(); _values = new double[numVals*numColumns]; - _offsetsLists = new int[numVals][]; + _offsetsLists = new IntArrayList[numVals]; int bitmapIx = 0; for( DArrayIListEntry val : distinctVals.extractValues()) { System.arraycopy(val.key.getData(), 0, _values, bitmapIx*numColumns, numColumns); - _offsetsLists[bitmapIx++] = val.value.extractValues(); + _offsetsLists[bitmapIx++] = val.value; } _numCols = numColumns; } @@ -61,11 +64,11 @@ public final class UncompressedBitmap // Convert inputs to arrays int numVals = distinctVals.size(); _values = new double[numVals]; - _offsetsLists = new int[numVals][]; + _offsetsLists = new IntArrayList[numVals]; int bitmapIx = 0; for(DIListEntry val : distinctVals.extractValues()) { _values[bitmapIx] = val.key; - _offsetsLists[bitmapIx++] = val.value.extractValues(); + _offsetsLists[bitmapIx++] = val.value; } _numCols = 1; } @@ -74,6 +77,15 @@ public final class UncompressedBitmap return _numCols; } + /** + * Get all values without unnecessary allocations and copies. + * + * @return dictionary of value tuples + */ + public double[] getValues() { + return _values; + } + /** * Obtain tuple of column values associated with index. * @@ -94,21 +106,46 @@ public final class UncompressedBitmap return _values.length / _numCols; } - /** - * Obtain array of offsets of the rows containing index value - * - * @param ix index of a particular distinct value - * @return IMMUTABLE array of the offsets of the rows containing the value - * with the indicated index - */ - public int[] getOffsetsList(int ix) { + public IntArrayList getOffsetsList(int ix) { return _offsetsLists[ix]; } - public int getNumOffsets() { - int ret = 0; - for( int[] offlist : _offsetsLists ) - ret += offlist.length; + public long getNumOffsets() { + long ret = 0; + for( IntArrayList offlist : _offsetsLists ) + ret += offlist.size(); return ret; } + + public int getNumOffsets(int ix) { + return _offsetsLists[ix].size(); + } + + public void sortValuesByFrequency() { + int numVals = getNumValues(); + int numCols = getNumColumns(); + + double[] freq = new double[numVals]; + int[] pos = new int[numVals]; + + //populate the temporary arrays + for(int i=0; i<numVals; i++) { + freq[i] = getNumOffsets(i); + pos[i] = i; + } + + //sort ascending and reverse (descending) + SortUtils.sortByValue(0, numVals, freq, pos); + ArrayUtils.reverse(pos); + + //create new value and offset list arrays + double[] lvalues = new double[numVals*numCols]; + IntArrayList[] loffsets = new IntArrayList[numVals]; + for(int i=0; i<numVals; i++) { + System.arraycopy(_values, pos[i]*numCols, lvalues, i*numCols, numCols); + loffsets[i] = _offsetsLists[pos[i]]; + } + _values = lvalues; + _offsetsLists = loffsets; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java new file mode 100644 index 0000000..05af19d --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java @@ -0,0 +1,19 @@ +package org.apache.sysml.runtime.compress.cocode; + +import java.util.HashMap; +import java.util.List; + +import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo; + +public abstract class ColumnGroupPartitioner +{ + /** + * Partitions a list of columns into a list of partitions that contains subsets of columns. + * Note that this call must compute a complete and disjoint partitioning. + * + * @param groupCols list of columns + * @param groupColsInfo list of column infos + * @return list of partitions (where each partition is a list of columns) + */ + public abstract List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo); +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java new file mode 100644 index 0000000..0fb6abe --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress.cocode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo; +import org.apache.sysml.runtime.util.SortUtils; + +/** + * Column group partitioning with bin packing heuristic. + * + */ +public class ColumnGroupPartitionerBinPacking extends ColumnGroupPartitioner +{ + private static final boolean FIRST_FIT_DEC = true; + private static final int MAX_COL_PER_GROUP = Integer.MAX_VALUE; + + //we use a constant partition size (independent of the number of rows + //in order to ensure constant compression speed independent of blocking) + public static double BIN_CAPACITY = 0.000032; //higher values, more grouping + + @Override + public List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo) + { + //obtain column weights + int[] items = new int[groupCols.size()]; + double[] itemWeights = new double[groupCols.size()]; + for( int i=0; i<groupCols.size(); i++ ) { + int col = groupCols.get(i); + items[i] = col; + itemWeights[i] = groupColsInfo.get(col).cardRatio; + } + + //sort items (first fit decreasing) + if( FIRST_FIT_DEC ) { + SortUtils.sortByValue(0, items.length, itemWeights, items); + ArrayUtils.reverse(items); + ArrayUtils.reverse(itemWeights); + } + + //partition columns via bin packing + return packFirstFit(items, itemWeights); + } + + /** + * NOTE: upper bound is 17/10 OPT + * + * @param items the items in terms of columns + * @param itemWeights the weights of the items + * @return + */ + private List<List<Integer>> packFirstFit(int[] items, double[] itemWeights) + { + List<List<Integer>> bins = new ArrayList<List<Integer>>(); + List<Double> binWeights = new ArrayList<Double>(); + + for( int i = 0; i < items.length; i++ ) { + //add to existing bin + boolean assigned = false; + for( int j = 0; j < bins.size(); j++ ) { + double newBinWeight = binWeights.get(j)-itemWeights[i]; + if( newBinWeight >= 0 && bins.get(j).size() < MAX_COL_PER_GROUP-1 ){ + bins.get(j).add(items[i]); + binWeights.set(j, newBinWeight); + assigned = true; break; + } + } + + //create new bin at end of list + if( !assigned ) { + bins.add(new ArrayList<Integer>(Arrays.asList(items[i]))); + binWeights.add(BIN_CAPACITY-itemWeights[i]); + } + } + + return bins; + } +}