http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java new file mode 100644 index 0000000..39bc162 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.utils.ConverterUtils; +import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.functionobjects.KahanPlusSq; +import org.apache.sysml.runtime.functionobjects.ReduceAll; +import org.apache.sysml.runtime.functionobjects.ReduceCol; +import org.apache.sysml.runtime.functionobjects.ReduceRow; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; + + +/** A group of columns compressed with a single run-length encoded bitmap. */ +public class ColGroupRLE extends ColGroupBitmap +{ + private static final long serialVersionUID = 7450232907594748177L; + + public ColGroupRLE() { + super(CompressionType.RLE_BITMAP); + } + + /** + * Main constructor. Constructs and stores the necessary bitmaps. + * + * @param colIndices + * indices (within the block) of the columns included in this + * column + * @param numRows + * total number of rows in the parent block + * @param ubm + * Uncompressed bitmap representation of the block + */ + public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm) + { + super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm); + + // compress the bitmaps + final int numVals = ubm.getNumValues(); + char[][] lbitmaps = new char[numVals][]; + int totalLen = 0; + for( int i=0; i<numVals; i++ ) { + lbitmaps[i] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(i)); + totalLen += lbitmaps[i].length; + } + + // compact bitmaps to linearized representation + createCompressedBitmaps(numVals, totalLen, lbitmaps); + } + + /** + * Constructor for internal use. + */ + public ColGroupRLE(int[] colIndices, int numRows, double[] values, char[] bitmaps, int[] bitmapOffs) { + super(CompressionType.RLE_BITMAP, colIndices, numRows, values); + _data = bitmaps; + _ptr = bitmapOffs; + } + + @Override + public Iterator<Integer> getDecodeIterator(int bmpIx) { + return new BitmapDecoderRLE(_data, _ptr[bmpIx], len(bmpIx)); + } + + @Override + public void decompressToBlock(MatrixBlock target) + { + if( LOW_LEVEL_OPT && getNumValues() > 1 ) + { + final int blksz = 128 * 1024; + final int numCols = getNumCols(); + final int numVals = getNumValues(); + final int n = getNumRows(); + + //position and start offset arrays + int[] apos = new int[numVals]; + int[] astart = new int[numVals]; + + //cache conscious append via horizontal scans + for( int bi=0; bi<n; bi+=blksz ) { + int bimax = Math.min(bi+blksz, n); + for (int k=0, off=0; k < numVals; k++, off+=numCols) { + int bitmapOff = _ptr[k]; + int bitmapLen = len(k); + int bufIx = apos[k]; + int start = astart[k]; + for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) { + start += _data[bitmapOff + bufIx]; + int len = _data[bitmapOff + bufIx+1]; + for( int i=start; i<start+len; i++ ) + for( int j=0; j<numCols; j++ ) + if( _values[off+j]!=0 ) + target.appendValue(i, _colIndexes[j], _values[off+j]); + start += len; + } + apos[k] = bufIx; + astart[k] = start; + } + } + } + else + { + //call generic decompression with decoder + super.decompressToBlock(target); + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int[] colixTargets) + { + if( LOW_LEVEL_OPT && getNumValues() > 1 ) + { + final int blksz = 128 * 1024; + final int numCols = getNumCols(); + final int numVals = getNumValues(); + final int n = getNumRows(); + + //position and start offset arrays + int[] apos = new int[numVals]; + int[] astart = new int[numVals]; + int[] cix = new int[numCols]; + + //prepare target col indexes + for( int j=0; j<numCols; j++ ) + cix[j] = colixTargets[_colIndexes[j]]; + + //cache conscious append via horizontal scans + for( int bi=0; bi<n; bi+=blksz ) { + int bimax = Math.min(bi+blksz, n); + for (int k=0, off=0; k < numVals; k++, off+=numCols) { + int bitmapOff = _ptr[k]; + int bitmapLen = len(k); + int bufIx = apos[k]; + if( bufIx >= bitmapLen ) + continue; + int start = astart[k]; + for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) { + start += _data[bitmapOff + bufIx]; + int len = _data[bitmapOff + bufIx+1]; + for( int i=start; i<start+len; i++ ) + for( int j=0; j<numCols; j++ ) + if( _values[off+j]!=0 ) + target.appendValue(i, cix[j], _values[off+j]); + start += len; + } + apos[k] = bufIx; + astart[k] = start; + } + } + } + else + { + //call generic decompression with decoder + super.decompressToBlock(target, colixTargets); + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int colpos) + { + if( LOW_LEVEL_OPT && getNumValues() > 1 ) + { + final int blksz = 128 * 1024; + final int numCols = getNumCols(); + final int numVals = getNumValues(); + final int n = getNumRows(); + double[] c = target.getDenseBlock(); + + //position and start offset arrays + int[] apos = new int[numVals]; + int[] astart = new int[numVals]; + + //cache conscious append via horizontal scans + for( int bi=0; bi<n; bi+=blksz ) { + int bimax = Math.min(bi+blksz, n); + for (int k=0, off=0; k < numVals; k++, off+=numCols) { + int bitmapOff = _ptr[k]; + int bitmapLen = len(k); + int bufIx = apos[k]; + if( bufIx >= bitmapLen ) + continue; + int start = astart[k]; + for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) { + start += _data[bitmapOff + bufIx]; + int len = _data[bitmapOff + bufIx+1]; + for( int i=start; i<start+len; i++ ) + c[i] = _values[off+colpos]; + start += len; + } + apos[k] = bufIx; + astart[k] = start; + } + } + + target.recomputeNonZeros(); + } + else + { + //call generic decompression with decoder + super.decompressToBlock(target, colpos); + } + } + + @Override + public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + double[] b = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + //prepare reduced rhs w/ relevant values + double[] sb = new double[numCols]; + for (int j = 0; j < numCols; j++) { + sb[j] = b[_colIndexes[j]]; + } + + if( LOW_LEVEL_OPT && numVals > 1 + && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) + { + //L3 cache alignment, see comment rightMultByVector OLE column group + //core difference of RLE to OLE is that runs are not segment alignment, + //which requires care of handling runs crossing cache-buckets + final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; + + //step 1: prepare position and value arrays + + //current pos / values per RLE list + int[] apos = new int[numVals]; + int[] astart = new int[numVals]; + double[] aval = new double[numVals]; + + //skip-scan to beginning for all OLs + if( rl > 0 ) { //rl aligned with blksz + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + int bix = 0; + int start = 0; + while( bix<blen ) { + int lstart = _data[boff + bix]; //start + int llen = _data[boff + bix + 1]; //len + if( start+lstart+llen >= rl ) + break; + start += lstart + llen; + bix += 2; + } + apos[k] = bix; + astart[k] = start; + } + } + + //pre-aggregate values per OLs + for( int k = 0; k < numVals; k++ ) + aval[k] = sumValues(k, sb); + + //step 2: cache conscious matrix-vector via horizontal scans + for( int bi=rl; bi<ru; bi+=blksz ) + { + int bimax = Math.min(bi+blksz, ru); + + //horizontal segment scan, incl pos maintenance + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + double val = aval[k]; + int bix = apos[k]; + int start = astart[k]; + + //compute partial results, not aligned + while( bix<blen ) { + int lstart = _data[boff + bix]; + int llen = _data[boff + bix + 1]; + LinearAlgebraUtils.vectAdd(val, c, Math.max(bi, start+lstart), + Math.min(start+lstart+llen,bimax) - Math.max(bi, start+lstart)); + if(start+lstart+llen >= bimax) + break; + start += lstart + llen; + bix += 2; + } + + apos[k] = bix; + astart[k] = start; + } + } + } + else + { + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + double val = sumValues(k, sb); + int bix = 0; + int start = 0; + + //scan to beginning offset if necessary + if( rl > 0 ) { //rl aligned with blksz + while( bix<blen ) { + int lstart = _data[boff + bix]; //start + int llen = _data[boff + bix + 1]; //len + if( start+lstart+llen >= rl ) + break; + start += lstart + llen; + bix += 2; + } + } + + //compute partial results, not aligned + while( bix<blen ) { + int lstart = _data[boff + bix]; + int llen = _data[boff + bix + 1]; + LinearAlgebraUtils.vectAdd(val, c, Math.max(rl, start+lstart), + Math.min(start+lstart+llen,ru) - Math.max(rl, start+lstart)); + if(start+lstart+llen >= ru) + break; + start += lstart + llen; + bix += 2; + } + } + } + } + + @Override + public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) + throws DMLRuntimeException + { + double[] a = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int numCols = getNumCols(); + final int numVals = getNumValues(); + final int n = getNumRows(); + + if( LOW_LEVEL_OPT && numVals > 1 + && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) + { + final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; + + //step 1: prepare position and value arrays + + //current pos per OLs / output values + int[] apos = new int[numVals]; + int[] astart = new int[numVals]; + double[] cvals = new double[numVals]; + + //step 2: cache conscious matrix-vector via horizontal scans + for( int ai=0; ai<n; ai+=blksz ) + { + int aimax = Math.min(ai+blksz, n); + + //horizontal scan, incl pos maintenance + for (int k = 0; k < numVals; k++) { + int bitmapOff = _ptr[k]; + int bitmapLen = len(k); + int bufIx = apos[k]; + int start = astart[k]; + + //compute partial results, not aligned + while( bufIx<bitmapLen & start<aimax ) { + start += _data[bitmapOff + bufIx]; + int len = _data[bitmapOff + bufIx+1]; + cvals[k] += LinearAlgebraUtils.vectSum(a, start, len); + start += len; + bufIx += 2; + } + + apos[k] = bufIx; + astart[k] = start; + } + } + + //step 3: scale partial results by values and write to global output + for (int k = 0, valOff=0; k < numVals; k++, valOff+=numCols) + for( int j = 0; j < numCols; j++ ) + c[ _colIndexes[j] ] += cvals[k] * _values[valOff+j]; + + } + else + { + //iterate over all values and their bitmaps + for (int bitmapIx=0, valOff=0; bitmapIx<numVals; bitmapIx++, valOff+=numCols) + { + int bitmapOff = _ptr[bitmapIx]; + int bitmapLen = len(bitmapIx); + + double vsum = 0; + int curRunEnd = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + int curRunLen = _data[bitmapOff+bufIx + 1]; + vsum += LinearAlgebraUtils.vectSum(a, curRunStartOff, curRunLen); + curRunEnd = curRunStartOff + curRunLen; + } + + //scale partial results by values and write results + for( int j = 0; j < numCols; j++ ) + c[ _colIndexes[j] ] += vsum * _values[ valOff+j ]; + } + } + } + + @Override + public ColGroup scalarOperation(ScalarOperator op) + throws DMLRuntimeException + { + double val0 = op.executeScalar(0); + + //fast path: sparse-safe operations + // Note that bitmaps don't change and are shallow-copied + if( op.sparseSafe || val0==0 ) { + return new ColGroupRLE(_colIndexes, _numRows, + applyScalarOp(op), _data, _ptr); + } + + //slow path: sparse-unsafe operations (potentially create new bitmap) + //note: for efficiency, we currently don't drop values that become 0 + boolean[] lind = computeZeroIndicatorVector(); + int[] loff = computeOffsets(lind); + if( loff.length==0 ) { //empty offset list: go back to fast path + return new ColGroupRLE(_colIndexes, _numRows, + applyScalarOp(op), _data, _ptr); + } + + double[] rvalues = applyScalarOp(op, val0, getNumCols()); + char[] lbitmap = BitmapEncoder.genRLEBitmap(loff); + char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length); + System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length); + int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1); + rbitmapOffs[rbitmapOffs.length-1] = rbitmaps.length; + + return new ColGroupRLE(_colIndexes, _numRows, + rvalues, rbitmaps, rbitmapOffs); + } + + @Override + public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) + throws DMLRuntimeException + { + KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? + KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); + + if( op.indexFn instanceof ReduceAll ) + computeSum(result, kplus); + else if( op.indexFn instanceof ReduceCol ) + computeRowSums(result, kplus); + else if( op.indexFn instanceof ReduceRow ) + computeColSums(result, kplus); + } + + /** + * + * @param result + */ + private void computeSum(MatrixBlock result, KahanFunction kplus) + { + KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); + + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) { + int bitmapOff = _ptr[bitmapIx]; + int bitmapLen = len(bitmapIx); + int valOff = bitmapIx * numCols; + int curRunEnd = 0; + int count = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1]; + count += curRunEnd-curRunStartOff; + } + + //scale counts by all values + for( int j = 0; j < numCols; j++ ) + kplus.execute3(kbuff, _values[ valOff+j ], count); + } + + result.quickSetValue(0, 0, kbuff._sum); + result.quickSetValue(0, 1, kbuff._correction); + } + + /** + * + * @param result + */ + private void computeRowSums(MatrixBlock result, KahanFunction kplus) + { + KahanObject kbuff = new KahanObject(0, 0); + final int numVals = getNumValues(); + + for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) { + int bitmapOff = _ptr[bitmapIx]; + int bitmapLen = len(bitmapIx); + double val = sumValues(bitmapIx); + + if (val != 0.0) { + int curRunStartOff = 0; + int curRunEnd = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1]; + for (int rix = curRunStartOff; rix < curRunEnd; rix++) { + kbuff.set(result.quickGetValue(rix, 0), result.quickGetValue(rix, 1)); + kplus.execute2(kbuff, val); + result.quickSetValue(rix, 0, kbuff._sum); + result.quickSetValue(rix, 1, kbuff._correction); + } + } + } + } + } + + /** + * + * @param result + */ + private void computeColSums(MatrixBlock result, KahanFunction kplus) + { + KahanObject kbuff = new KahanObject(0, 0); + + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) { + int bitmapOff = _ptr[bitmapIx]; + int bitmapLen = len(bitmapIx); + int valOff = bitmapIx * numCols; + int curRunEnd = 0; + int count = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1]; + count += curRunEnd-curRunStartOff; + } + + //scale counts by all values + for( int j = 0; j < numCols; j++ ) { + kbuff.set(result.quickGetValue(0, _colIndexes[j]),result.quickGetValue(1, _colIndexes[j])); + kplus.execute3(kbuff, _values[ valOff+j ], count); + result.quickSetValue(0, _colIndexes[j], kbuff._sum); + result.quickSetValue(1, _colIndexes[j], kbuff._correction); + } + } + } + + public boolean[] computeZeroIndicatorVector() + throws DMLRuntimeException + { + boolean[] ret = new boolean[_numRows]; + final int numVals = getNumValues(); + + //initialize everything with zero + Arrays.fill(ret, true); + + for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) { + int bitmapOff = _ptr[bitmapIx]; + int bitmapLen = len(bitmapIx); + + int curRunStartOff = 0; + int curRunEnd = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1]; + Arrays.fill(ret, curRunStartOff, curRunEnd, false); + } + } + + return ret; + } + + @Override + protected void countNonZerosPerRow(int[] rnnz) + { + final int numVals = getNumValues(); + final int numCols = getNumCols(); + + for (int k = 0; k < numVals; k++) { + int bitmapOff = _ptr[k]; + int bitmapLen = len(k); + + int curRunStartOff = 0; + int curRunEnd = 0; + for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) { + curRunStartOff = curRunEnd + _data[bitmapOff+bufIx]; + curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1]; + for( int i=curRunStartOff; i<curRunEnd; i++ ) + rnnz[i] += numCols; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java new file mode 100644 index 0000000..d9b75a1 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.matrix.data.LibMatrixAgg; +import org.apache.sysml.runtime.matrix.data.LibMatrixMult; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; +import org.apache.sysml.runtime.util.SortUtils; + + +/** + * Column group type for columns that are stored as dense arrays of doubles. + * Uses a MatrixBlock internally to store the column contents. + * + */ +public class ColGroupUncompressed extends ColGroup +{ + private static final long serialVersionUID = 4870546053280378891L; + + /** + * We store the contents of the columns as a MatrixBlock to take advantage + * of high-performance routines available for this data structure. + */ + private MatrixBlock _data; + + public ColGroupUncompressed() { + super(CompressionType.UNCOMPRESSED, (int[])null, -1); + } + + /** + * Main constructor. + * + * @param colIndicesList + * indices (relative to the current block) of the columns that + * this column group represents. + * @param rawblock + * the uncompressed block; uncompressed data must be present at + * the time that the constructor is called + * @throws DMLRuntimeException + */ + @SuppressWarnings("unused") + public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock) + throws DMLRuntimeException + { + super(CompressionType.UNCOMPRESSED, colIndicesList, + CompressedMatrixBlock.TRANSPOSE_INPUT ? + rawblock.getNumColumns() : rawblock.getNumRows()); + + //prepare meta data + int numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? + rawblock.getNumColumns() : rawblock.getNumRows(); + + // Create a matrix with just the requested rows of the original block + _data = new MatrixBlock(numRows, + _colIndexes.length, rawblock.isInSparseFormat()); + + //ensure sorted col indices + if(!SortUtils.isSorted(0, _colIndexes.length, _colIndexes)) + Arrays.sort(_colIndexes); + + //special cases empty blocks + if (rawblock.isEmptyBlock(false)) + return; + //special cases full block + if( !CompressedMatrixBlock.TRANSPOSE_INPUT && + _data.getNumColumns() == rawblock.getNumColumns() ){ + _data.copy(rawblock); + return; + } + + // dense implementation for dense and sparse matrices to avoid linear search + int m = numRows; + int n = _colIndexes.length; + for( int i = 0; i < m; i++) { + for( int j = 0; j < n; j++ ) { + double val = CompressedMatrixBlock.TRANSPOSE_INPUT ? + rawblock.quickGetValue(_colIndexes[j], i) : + rawblock.quickGetValue(i, _colIndexes[j]); + _data.appendValue(i, j, val); + } + } + _data.examSparsity(); + } + + /** + * Constructor for creating temporary decompressed versions of one or more + * compressed column groups. + * + * @param groupsToDecompress + * compressed columns to subsume. Must contain at least one + * element. + */ + public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) + { + super(CompressionType.UNCOMPRESSED, + mergeColIndices(groupsToDecompress), + groupsToDecompress.get(0)._numRows); + + // Invert the list of column indices + int maxColIndex = _colIndexes[_colIndexes.length - 1]; + int[] colIndicesInverted = new int[maxColIndex + 1]; + for (int i = 0; i < _colIndexes.length; i++) { + colIndicesInverted[_colIndexes[i]] = i; + } + + // Create the buffer that holds the uncompressed data, packed together + _data = new MatrixBlock(_numRows, _colIndexes.length, false); + + for (ColGroup colGroup : groupsToDecompress) { + colGroup.decompressToBlock(_data, colIndicesInverted); + } + } + + /** + * Constructor for internal use. Used when a method needs to build an + * instance of this class from scratch. + * + * @param colIndices + * column mapping for this column group + * @param numRows + * number of rows in the column, for passing to the superclass + * @param colContents + * uncompressed cell values + */ + public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data) + { + super(CompressionType.UNCOMPRESSED, colIndices, numRows); + _data = data; + } + + + /** + * Access for superclass + * + * @return direct pointer to the internal representation of the columns + */ + public MatrixBlock getData() { + return _data; + } + + /** + * Subroutine of constructor. + * + * @param groupsToDecompress + * input to the constructor that decompresses into a temporary + * UncompressedColGroup + * @return a merged set of column indices across all those groups + */ + private static int[] mergeColIndices(ArrayList<ColGroup> groupsToDecompress) + { + // Pass 1: Determine number of columns + int sz = 0; + for (ColGroup colGroup : groupsToDecompress) { + sz += colGroup.getNumCols(); + } + + // Pass 2: Copy column offsets out + int[] ret = new int[sz]; + int pos = 0; + for (ColGroup colGroup : groupsToDecompress) { + int[] tmp = colGroup.getColIndices(); + System.arraycopy(tmp, 0, ret, pos, tmp.length); + pos += tmp.length; + } + + // Pass 3: Sort and return the list of columns + Arrays.sort(ret); + return ret; + } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + // adding the size of colContents + return size + 8 + _data.estimateSizeInMemory(); + } + + @Override + public void decompressToBlock(MatrixBlock target) { + //empty block, nothing to add to output + if( _data.isEmptyBlock(false) ) + return; + for (int row = 0; row < _data.getNumRows(); row++) { + for (int colIx = 0; colIx < _colIndexes.length; colIx++) { + int col = _colIndexes[colIx]; + double cellVal = _data.quickGetValue(row, colIx); + target.quickSetValue(row, col, cellVal); + } + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) { + //empty block, nothing to add to output + if( _data.isEmptyBlock(false) ) { + return; + } + // Run through the rows, putting values into the appropriate locations + for (int row = 0; row < _data.getNumRows(); row++) { + for (int colIx = 0; colIx < _data.getNumColumns(); colIx++) { + int origMatrixColIx = getColIndex(colIx); + int col = colIndexTargets[origMatrixColIx]; + double cellVal = _data.quickGetValue(row, colIx); + target.quickSetValue(row, col, cellVal); + } + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int colpos) { + //empty block, nothing to add to output + if( _data.isEmptyBlock(false) ) { + return; + } + // Run through the rows, putting values into the appropriate locations + for (int row = 0; row < _data.getNumRows(); row++) { + double cellVal = _data.quickGetValue(row, colpos); + target.quickSetValue(row, 0, cellVal); + } + } + + @Override + public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + // Pull out the relevant rows of the vector + int clen = _colIndexes.length; + + MatrixBlock shortVector = new MatrixBlock(clen, 1, false); + shortVector.allocateDenseBlock(); + double[] b = shortVector.getDenseBlock(); + for (int colIx = 0; colIx < clen; colIx++) + b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0); + shortVector.recomputeNonZeros(); + + // Multiply the selected columns by the appropriate parts of the vector + LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru); + } + + @Override + public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) + throws DMLRuntimeException + { + MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false); + LibMatrixMult.matrixMult(vector, _data, pret); + + // copying partialResult to the proper indices of the result + if( !pret.isEmptyBlock(false) ) { + double[] rsltArr = result.getDenseBlock(); + for (int colIx = 0; colIx < _colIndexes.length; colIx++) + rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx); + result.recomputeNonZeros(); + } + } + + public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int k) + throws DMLRuntimeException + { + MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false); + LibMatrixMult.matrixMult(vector, _data, pret, k); + + // copying partialResult to the proper indices of the result + if( !pret.isEmptyBlock(false) ) { + double[] rsltArr = result.getDenseBlock(); + for (int colIx = 0; colIx < _colIndexes.length; colIx++) + rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx); + result.recomputeNonZeros(); + } + } + + @Override + public ColGroup scalarOperation(ScalarOperator op) + throws DMLRuntimeException + { + //execute scalar operations + MatrixBlock retContent = (MatrixBlock) _data + .scalarOperations(op, new MatrixBlock()); + + //construct new uncompressed column group + return new ColGroupUncompressed(getColIndices(), _data.getNumRows(), retContent); + } + + @Override + public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock ret) + throws DMLRuntimeException + { + //execute unary aggregate operations + LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op); + } + + @Override + public void readFields(DataInput in) + throws IOException + { + //read col contents (w/ meta data) + _data = new MatrixBlock(); + _data.readFields(in); + _numRows = _data.getNumRows(); + + //read col indices + int numCols = _data.getNumColumns(); + _colIndexes = new int[ numCols ]; + for( int i=0; i<numCols; i++ ) + _colIndexes[i] = in.readInt(); + } + + @Override + public void write(DataOutput out) + throws IOException + { + //write col contents first (w/ meta data) + _data.write(out); + + //write col indices + int len = _data.getNumColumns(); + for( int i=0; i<len; i++ ) + out.writeInt( _colIndexes[i] ); + } + + @Override + public long getExactSizeOnDisk() { + return _data.getExactSizeOnDisk() + + 4 * _data.getNumColumns(); + } + + @Override + protected void countNonZerosPerRow(int[] rnnz) + { + for( int i=0; i<_data.getNumRows(); i++ ) + rnnz[i] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java new file mode 100644 index 0000000..6ac26c6 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -0,0 +1,1342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.lops.MMTSJ.MMTSJType; +import org.apache.sysml.lops.MapMultChain.ChainType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.ColGroup.CompressionType; +import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; +import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory; +import org.apache.sysml.runtime.compress.utils.ConverterUtils; +import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; +import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.functionobjects.KahanPlusSq; +import org.apache.sysml.runtime.functionobjects.Multiply; +import org.apache.sysml.runtime.functionobjects.ReduceRow; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.LibMatrixBincell; +import org.apache.sysml.runtime.matrix.data.LibMatrixReorg; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.MatrixValue; +import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; +import org.apache.sysml.runtime.matrix.operators.BinaryOperator; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; + +/** + * Experimental version of MatrixBlock that allows a compressed internal + * representation. + */ +public class CompressedMatrixBlock extends MatrixBlock implements Externalizable +{ + private static final long serialVersionUID = 7319972089143154057L; + + //internal configuration + public static final int MAX_NUMBER_COCODING_COLUMNS = 1000; + public static final double MIN_COMPRESSION_RATIO = 2.0; + public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding. + public static final boolean TRANSPOSE_INPUT = true; + public static final boolean MATERIALIZE_ZEROS = false; + public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB + public static final boolean INVESTIGATE_ESTIMATES = false; + private static final boolean LDEBUG = false; //local debug flag + + private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); + + static{ + // for internal debugging only + if( LDEBUG ) { + Logger.getLogger("org.apache.sysml.runtime.compress") + .setLevel((Level) Level.DEBUG); + } + } + + protected ArrayList<ColGroup> _colGroups = null; + protected CompressionStatistics _stats = null; + + public CompressedMatrixBlock() { + super(-1, -1, true); + } + + /** + * Main constructor for building a block from scratch. + * + * @param rl + * number of rows in the block + * @param cl + * number of columns + * @param sparse + * true if the UNCOMPRESSED representation of the block should be + * sparse + */ + public CompressedMatrixBlock(int rl, int cl, boolean sparse) { + super(rl, cl, sparse); + } + + /** + * "Copy" constructor to populate this compressed block with the + * uncompressed contents of a conventional block. Does <b>not</b> compress + * the block. + */ + public CompressedMatrixBlock(MatrixBlock mb) { + super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat()); + + //shallow copy (deep copy on compression, prevents unnecessary copy) + if( isInSparseFormat() ) + sparseBlock = mb.getSparseBlock(); + else + denseBlock = mb.getDenseBlock(); + nonZeros = mb.getNonZeros(); + } + + /** + * + * @return the column groups constructed by the compression process. + * + */ + public ArrayList<ColGroup> getColGroups() { + return _colGroups; + } + + /** + * @return true if this block is in compressed form; false if the block has + * not yet been compressed + */ + public boolean isCompressed() { + return (_colGroups != null); + } + + /** + * + * @return + */ + public boolean isSingleUncompressedGroup(){ + return (_colGroups!=null && _colGroups.size()==1 + && _colGroups.get(0) instanceof ColGroupUncompressed); + } + + private void allocateColGroupList() { + _colGroups = new ArrayList<ColGroup>(); + } + + @Override + public boolean isEmptyBlock(boolean safe) { + if( !isCompressed() ) + return super.isEmptyBlock(safe); + return (_colGroups == null || getNonZeros()==0); + } + + /** + * Compress the contents of this matrix block. After compression, the + * uncompressed data is discarded. Attempts to update this block after + * calling this method currently result in INCORRECT BEHAVIOR, something + * which should be fixed if we move ahead with this compression strategy. + * + * +per column sparsity + */ + public void compress() + throws DMLRuntimeException + { + //check for redundant compression + if( isCompressed() ){ + throw new DMLRuntimeException("Redundant compression, block already compressed."); + } + + Timing time = new Timing(true); + _stats = new CompressionStatistics(); + + // SAMPLE-BASED DECISIONS: + // Decisions such as testing if a column is amenable to bitmap + // compression or evaluating co-coding potentionls are made based on a + // subset of the rows. For large datasets, sampling might take a + // significant amount of time. So, we generate only one sample and use + // it for the entire compression process. + + //prepare basic meta data and deep copy / transpose input + final int numRows = getNumRows(); + final int numCols = getNumColumns(); + final boolean sparse = isInSparseFormat(); + MatrixBlock rawblock = this; + if( TRANSPOSE_INPUT ) + rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse)); + else + rawblock = new MatrixBlock(this); + + //construct sample-based size estimator + CompressedSizeEstimator bitmapSizeEstimator = + SizeEstimatorFactory.getSizeEstimator(rawblock, numRows); + + //allocate list of column groups + allocateColGroupList(); + + // The current implementation of this method is written for correctness, + // not for performance or for minimal use of temporary space. + + // We start with a full set of columns. + HashSet<Integer> remainingCols = new HashSet<Integer>(); + for (int i = 0; i < numCols; i++) { + remainingCols.add(i); + } + + // PHASE 1: Classify columns by compression type + // We start by determining which columns are amenable to bitmap + // compression + + // It is correct to use the dense size as the uncompressed size + // FIXME not numRows but nnz / col otherwise too aggressive overestimation + // of uncompressed size and hence overestimation of compression potential + double uncompressedColumnSize = 8 * numRows; + + // information about the bitmap amenable columns + List<Integer> bitmapCols = new ArrayList<Integer>(); + List<Integer> uncompressedCols = new ArrayList<Integer>(); + List<Integer> colsCardinalities = new ArrayList<Integer>(); + List<Long> compressedSizes = new ArrayList<Long>(); + HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>(); + + // Minimum ratio (size of uncompressed / size of compressed) that we + // will accept when encoding a field with a bitmap. + for (int col = 0; col < numCols; col++) + { + CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator + .estimateCompressedColGroupSize(new int[] { col }); + long compressedSize = compressedSizeInfo.getMinSize(); + double compRatio = uncompressedColumnSize / compressedSize; + + //FIXME: compression ratio should be checked against 1 instead of min compression + //ratio; I think this threshold was only required because we overestimated the + //the uncompressed column size with n\alpha instead of z\alpha + if (compRatio >= MIN_COMPRESSION_RATIO) { + bitmapCols.add(col); + compressionRatios.put(col, compRatio); + colsCardinalities.add(compressedSizeInfo.getEstCarinality()); + compressedSizes.add(compressedSize); + } + else + uncompressedCols.add(col); + } + + _stats.timePhase1 = time.stop(); + if( LOG.isDebugEnabled() ) + LOG.debug("compression phase 1: "+_stats.timePhase1); + + // Filters for additional types of compression should be inserted here. + + // PHASE 2: Grouping columns + // Divide the bitmap columns into column groups. + List<int[]> bitmapColGrps = null; + if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) { + // Too many columns to compute co-coding groups with current methods. + // Generate singleton groups. + bitmapColGrps = new ArrayList<int[]>(bitmapCols.size()); + for (int col : bitmapCols) { + bitmapColGrps.add(new int[] { col }); + } + } + else { + bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( + bitmapSizeEstimator, bitmapCols, colsCardinalities, + compressedSizes, numRows, isInSparseFormat() ? + OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1); + } + + _stats.timePhase2 = time.stop(); + if( LOG.isDebugEnabled() ) + LOG.debug("compression phase 2: "+_stats.timePhase2); + + if( INVESTIGATE_ESTIMATES ) { + double est = 0; + for( int[] groupIndices : bitmapColGrps ) + est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize(); + est += uncompressedCols.size() * uncompressedColumnSize; + _stats.estSize = est; + } + + // PHASE 3: Compress and correct sample-based decisions + + for (int[] groupIndices : bitmapColGrps) + { + int[] allGroupIndices = null; + int allColsCount = groupIndices.length; + CompressedSizeInfo bitmapSizeInfo; + // The compression type is decided based on a full bitmap since it + // will be reused for the actual compression step. + UncompressedBitmap ubm; + PriorityQueue<CompressedColumn> compRatioPQ = null; + boolean skipGroup = false; + while (true) + { + ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock); + bitmapSizeInfo = bitmapSizeEstimator + .estimateCompressedColGroupSize(ubm); + double compRatio = uncompressedColumnSize * groupIndices.length + / bitmapSizeInfo.getMinSize(); + if (compRatio >= MIN_COMPRESSION_RATIO) { + // we have a good group + for( Integer col : groupIndices ) + remainingCols.remove(col); + break; + } else { + // modify the group + if (compRatioPQ == null) { + // first modification + allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length); + compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>(); + for (int i = 0; i < groupIndices.length; i++) + compRatioPQ.add(new CompressedColumn(i, + compressionRatios.get(groupIndices[i]))); + } + + // index in allGroupIndices + int removeIx = compRatioPQ.poll().colIx; + allGroupIndices[removeIx] = -1; + allColsCount--; + if (allColsCount == 0) { + skipGroup = true; + break; + } + groupIndices = new int[allColsCount]; + // copying the values that do not equal -1 + int ix = 0; + for (int col : allGroupIndices) { + if (col != -1) { + groupIndices[ix++] = col; + } + } + + } + } + + if (skipGroup) + continue; + long rleNumBytes = bitmapSizeInfo.getRLESize(); + long offsetNumBytes = bitmapSizeInfo.getOLESize(); + double rleRatio = (double) offsetNumBytes / (double) rleNumBytes; + + if (rleRatio > MIN_RLE_RATIO) { + ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices, + numRows, ubm); + _colGroups.add(compressedGroup); + } + else { + ColGroupOLE compressedGroup = new ColGroupOLE( + groupIndices, numRows, ubm); + _colGroups.add(compressedGroup); + } + + } + + _stats.timePhase3 = time.stop(); + if( LOG.isDebugEnabled() ) + LOG.debug("compression phase 3: "+_stats.timePhase3); + + // Phase 4: Cleanup + // The remaining columns are stored uncompressed as one big column group + if (remainingCols.size() > 0) { + ArrayList<Integer> list = new ArrayList<Integer>(remainingCols); + ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock); + _colGroups.add(ucgroup); + } + + //final cleanup (discard uncompressed block) + rawblock.cleanupBlock(true, true); + this.cleanupBlock(true, true); + + _stats.timePhase4 = time.stop(); + if( LOG.isDebugEnabled() ) + LOG.debug("compression phase 4: "+_stats.timePhase4); + } + + /** + * @return a new uncompressed matrix block containing the contents of this + * block + */ + public MatrixBlock decompress() throws DMLRuntimeException + { + //early abort for not yet compressed blocks + if( !isCompressed() ) + return new MatrixBlock(this); + + //preallocation sparse rows to avoid repeated reallocations + MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros()); + if( ret.isInSparseFormat() ) { + int[] rnnz = new int[rlen]; + for (ColGroup grp : _colGroups) + grp.countNonZerosPerRow(rnnz); + ret.allocateSparseRowsBlock(); + SparseBlock rows = ret.getSparseBlock(); + for( int i=0; i<rlen; i++ ) + rows.allocate(i, rnnz[i]); + } + + //core decompression (append if sparse) + for (ColGroup grp : _colGroups) + grp.decompressToBlock(ret); + + //post-processing (for append in decompress) + if( isInSparseFormat() ) + ret.sortSparseRows(); + + return ret; + } + + public CompressionStatistics getCompressionStatistics(){ + return _stats; + } + + /** + * + * @return an upper bound on the memory used to store this compressed block + * considering class overhead. + */ + public long estimateCompressedSizeInMemory() { + if (!isCompressed()) + return 0; + // basic data inherited from MatrixBlock + long total = MatrixBlock.estimateSizeInMemory(0, 0, 0); + // adding the size of colGroups ArrayList overhead + // object overhead (32B) + int size (4B) + int modCount (4B) + Object[] + // elementData overhead + reference (32+8)B +reference ofr each Object (8B) + total += 80 + 8 * _colGroups.size(); + for (ColGroup grp : _colGroups) + total += grp.estimateInMemorySize(); + return total; + } + + private class CompressedColumn implements Comparable<CompressedColumn> { + int colIx; + double compRatio; + + public CompressedColumn(int colIx, double compRatio) { + this.colIx = colIx; + this.compRatio = compRatio; + } + + @Override + public int compareTo(CompressedColumn o) { + return (int) Math.signum(compRatio - o.compRatio); + } + } + + public static class CompressionStatistics { + public double timePhase1 = -1; + public double timePhase2 = -1; + public double timePhase3 = -1; + public double timePhase4 = -1; + public double estSize = -1; + + public CompressionStatistics() { + //do nothing + } + + public CompressionStatistics(double t1, double t2, double t3, double t4){ + timePhase1 = t1; + timePhase2 = t2; + timePhase3 = t3; + timePhase4 = t4; + } + } + + + ////////////////////////////////////////// + // Serialization / Deserialization + + @Override + public long getExactSizeOnDisk() + { + //header information + long ret = 12; + + for( ColGroup grp : _colGroups ) { + ret += 1; //type info + ret += grp.getExactSizeOnDisk(); + } + + return ret; + } + + @Override + public void readFields(DataInput in) + throws IOException + { + boolean compressed = in.readBoolean(); + + //deserialize uncompressed block + if( !compressed ) { + super.readFields(in); + return; + } + + //deserialize compressed block + rlen = in.readInt(); + clen = in.readInt(); + nonZeros = in.readLong(); + int ncolGroups = in.readInt(); + + _colGroups = new ArrayList<ColGroup>(ncolGroups); + for( int i=0; i<ncolGroups; i++ ) + { + CompressionType ctype = CompressionType.values()[in.readByte()]; + ColGroup grp = null; + + //create instance of column group + switch( ctype ) { + case UNCOMPRESSED: + grp = new ColGroupUncompressed(); break; + case OLE_BITMAP: + grp = new ColGroupOLE(); break; + case RLE_BITMAP: + grp = new ColGroupRLE(); break; + } + + //deserialize and add column group + grp.readFields(in); + _colGroups.add(grp); + } + } + + @Override + public void write(DataOutput out) + throws IOException + { + out.writeBoolean( isCompressed() ); + + //serialize uncompressed block + if( !isCompressed() ) { + super.write(out); + return; + } + + //serialize compressed matrix block + out.writeInt(rlen); + out.writeInt(clen); + out.writeLong(nonZeros); + out.writeInt(_colGroups.size()); + + for( ColGroup grp : _colGroups ) { + out.writeByte( grp.getCompType().ordinal() ); + grp.write(out); //delegate serialization + } + } + + + /** + * Redirects the default java serialization via externalizable to our default + * hadoop writable serialization for efficient broadcast/rdd deserialization. + * + * @param is + * @throws IOException + */ + @Override + public void readExternal(ObjectInput is) + throws IOException + { + readFields(is); + } + + /** + * Redirects the default java serialization via externalizable to our default + * hadoop writable serialization for efficient broadcast/rdd serialization. + * + * @param is + * @throws IOException + */ + @Override + public void writeExternal(ObjectOutput os) + throws IOException + { + write(os); + } + + + ////////////////////////////////////////// + // Operations (overwrite existing ops for seamless integration) + + @Override + public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result) + throws DMLRuntimeException + { + //call uncompressed matrix scalar if necessary + if( !isCompressed() ) { + return super.scalarOperations(sop, result); + } + + //allocate the output matrix block + CompressedMatrixBlock ret = null; + if( result==null || !(result instanceof CompressedMatrixBlock) ) + ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse); + else { + ret = (CompressedMatrixBlock) result; + ret.reset(rlen, clen); + } + + // Apply the operation recursively to each of the column groups. + // Most implementations will only modify metadata. + ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>(); + for (ColGroup grp : _colGroups) { + newColGroups.add(grp.scalarOperation(sop)); + } + ret._colGroups = newColGroups; + ret.setNonZeros(rlen*clen); + + return ret; + } + + @Override + public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret) + throws DMLRuntimeException + { + //call uncompressed matrix append if necessary + if( !isCompressed() ) { + if( that instanceof CompressedMatrixBlock ) + that = ((CompressedMatrixBlock) that).decompress(); + return super.appendOperations(that, ret); + } + + final int m = rlen; + final int n = clen+that.getNumColumns(); + final long nnz = nonZeros+that.getNonZeros(); + + //init result matrix + CompressedMatrixBlock ret2 = null; + if( ret == null || !(ret instanceof CompressedMatrixBlock) ) { + ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat()); + } + else { + ret2 = (CompressedMatrixBlock) ret; + ret2.reset(m, n); + } + + //shallow copy of lhs column groups + ret2.allocateColGroupList(); + ret2._colGroups.addAll(_colGroups); + + //copy of rhs column groups w/ col index shifting + if( !(that instanceof CompressedMatrixBlock) ) { + that = new CompressedMatrixBlock(that); + ((CompressedMatrixBlock)that).compress(); + } + ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups; + for( ColGroup group : inColGroups ) { + ColGroup tmp = ConverterUtils.copyColGroup(group); + tmp.shiftColIndices(clen); + ret2._colGroups.add(tmp); + } + + //meta data maintenance + ret2.setNonZeros(nnz); + return ret2; + } + + @Override + public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ) { + return super.chainMatrixMultOperations(v, w, out, ctype); + } + + //single-threaded mmchain of single uncompressed colgroup + if( isSingleUncompressedGroup() ){ + return ((ColGroupUncompressed)_colGroups.get(0)) + .getData().chainMatrixMultOperations(v, w, out, ctype); + } + + //Timing time = new Timing(true); + + //prepare result + if( out != null ) + out.reset(clen, 1, false); + else + out = new MatrixBlock(clen, 1, false); + + //empty block handling + if( isEmptyBlock(false) ) + return out; + + //compute matrix mult + MatrixBlock tmp = new MatrixBlock(rlen, 1, false); + rightMultByVector(v, tmp); + if( ctype == ChainType.XtwXv ) { + BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject()); + LibMatrixBincell.bincellOpInPlace(tmp, w, bop); + } + leftMultByVectorTranspose(_colGroups, tmp, out, true); + + //System.out.println("Compressed MMChain in "+time.stop()); + + return out; + } + + @Override + public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ){ + return super.chainMatrixMultOperations(v, w, out, ctype, k); + } + + //multi-threaded mmchain of single uncompressed colgroup + if( isSingleUncompressedGroup() ){ + return ((ColGroupUncompressed)_colGroups.get(0)) + .getData().chainMatrixMultOperations(v, w, out, ctype, k); + } + + Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; + + //prepare result + if( out != null ) + out.reset(clen, 1, false); + else + out = new MatrixBlock(clen, 1, false); + + //empty block handling + if( isEmptyBlock(false) ) + return out; + + //compute matrix mult + MatrixBlock tmp = new MatrixBlock(rlen, 1, false); + rightMultByVector(v, tmp, k); + if( ctype == ChainType.XtwXv ) { + BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject()); + LibMatrixBincell.bincellOpInPlace(tmp, w, bop); + } + leftMultByVectorTranspose(_colGroups, tmp, out, true, k); + + if( LOG.isDebugEnabled() ) + LOG.debug("Compressed MMChain k="+k+" in "+time.stop()); + + return out; + } + + @Override + public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ) { + return super.aggregateBinaryOperations(mv1, mv2, result, op); + } + + //multi-threaded mm of single uncompressed colgroup + if( isSingleUncompressedGroup() ){ + MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData(); + return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op); + } + + Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; + + //setup meta data (dimensions, sparsity) + int rl = mv1.getNumRows(); + int cl = mv2.getNumColumns(); + + //create output matrix block + MatrixBlock ret = (MatrixBlock) result; + if( ret==null ) + ret = new MatrixBlock(rl, cl, false, rl*cl); + else + ret.reset(rl, cl, false, rl*cl); + + //compute matrix mult + if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right + CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1; + MatrixBlock mb = (MatrixBlock) mv2; + if( op.getNumThreads()>1 ) + cmb.rightMultByVector(mb, ret, op.getNumThreads()); + else + cmb.rightMultByVector(mb, ret); + } + else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV left + MatrixBlock mb = (MatrixBlock) mv1; + if( op.getNumThreads()>1 ) + leftMultByVectorTranspose(_colGroups, mb, ret, false, op.getNumThreads()); + else + leftMultByVectorTranspose(_colGroups, mb, ret, false); + } + else { + //NOTE: we could decompress and invoke super.aggregateBinary but for now + //we want to have an eager fail if this happens + throw new DMLRuntimeException("Unsupported matrix-matrix multiplication over compressed matrix block."); + } + + if( LOG.isDebugEnabled() ) + LOG.debug("Compressed MM in "+time.stop()); + + return ret; + } + + @Override + public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, + int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ) { + return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP); + } + + //check for supported operations + if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) ){ + throw new DMLRuntimeException("Unary aggregates other than sums not supported yet."); + } + + //prepare output dimensions + CellIndex tempCellIndex = new CellIndex(-1,-1); + op.indexFn.computeDimension(rlen, clen, tempCellIndex); + if(op.aggOp.correctionExists) { + switch(op.aggOp.correctionLocation) + { + case LASTROW: tempCellIndex.row++; break; + case LASTCOLUMN: tempCellIndex.column++; break; + case LASTTWOROWS: tempCellIndex.row+=2; break; + case LASTTWOCOLUMNS: tempCellIndex.column+=2; break; + default: + throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation); + } + } + + //prepare output + if(result==null) + result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false); + else + result.reset(tempCellIndex.row, tempCellIndex.column, false); + + MatrixBlock ret = (MatrixBlock) result; + + //core unary aggregate + if( op.getNumThreads() > 1 + && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD ) + { + // initialize and allocate the result + ret.allocateDenseBlock(); + + //multi-threaded execution of all groups + ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false); + ColGroupUncompressed uc = getUncompressedColGroup(); + try { + //compute all compressed column groups + ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() ); + ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>(); + for( ArrayList<ColGroup> grp : grpParts ) + tasks.add(new UnaryAggregateTask(grp, ret, op)); + pool.invokeAll(tasks); + pool.shutdown(); + //compute uncompressed column group in parallel (otherwise bottleneck) + if( uc != null ) + ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false); + //aggregate partial results + if( !(op.indexFn instanceof ReduceRow) ){ + KahanObject kbuff = new KahanObject(0,0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + for( int i=0; i<ret.getNumRows(); i++ ) { + kbuff.set(ret.quickGetValue(i, 0), ret.quickGetValue(i, 0)); + for( UnaryAggregateTask task : tasks ) + kplus.execute2(kbuff, task.getResult().quickGetValue(i, 0)); + ret.quickSetValue(i, 0, kbuff._sum); + ret.quickSetValue(i, 1, kbuff._correction); + } + } + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + else { + for (ColGroup grp : _colGroups) { + grp.unaryAggregateOperations(op, ret); + } + } + + //drop correction if necessary + if(op.aggOp.correctionExists && inCP) + ret.dropLastRowsOrColums(op.aggOp.correctionLocation); + + //post-processing + ret.recomputeNonZeros(); + + return ret; + } + + @Override + public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ) { + return super.transposeSelfMatrixMultOperations(out, tstype); + } + + //single-threaded tsmm of single uncompressed colgroup + if( isSingleUncompressedGroup() ){ + return ((ColGroupUncompressed)_colGroups.get(0)) + .getData().transposeSelfMatrixMultOperations(out, tstype); + } + + Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; + + //check for transpose type + if( tstype != MMTSJType.LEFT ) //right not supported yet + throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'."); + + //create output matrix block + if( out == null ) + out = new MatrixBlock(clen, clen, false); + else + out.reset(clen, clen, false); + out.allocateDenseBlock(); + + if( !isEmptyBlock(false) ) { + //compute matrix mult + leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size()); + + // post-processing + out.recomputeNonZeros(); + } + + if( LOG.isDebugEnabled() ) + LOG.debug("Compressed TSMM in "+time.stop()); + + return out; + } + + + @Override + public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) + throws DMLRuntimeException + { + //call uncompressed matrix mult if necessary + if( !isCompressed() ){ + return super.transposeSelfMatrixMultOperations(out, tstype, k); + } + + //multi-threaded tsmm of single uncompressed colgroup + if( isSingleUncompressedGroup() ){ + return ((ColGroupUncompressed)_colGroups.get(0)) + .getData().transposeSelfMatrixMultOperations(out, tstype, k); + } + + Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; + + //check for transpose type + if( tstype != MMTSJType.LEFT ) //right not supported yet + throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'."); + + //create output matrix block + if( out == null ) + out = new MatrixBlock(clen, clen, false); + else + out.reset(clen, clen, false); + out.allocateDenseBlock(); + + if( !isEmptyBlock(false) ) { + //compute matrix mult + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<MatrixMultTransposeTask>(); + int blklen = (int)(Math.ceil((double)clen/(2*k))); + for( int i=0; i<2*k & i*blklen<clen; i++ ) + tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, clen))); + List<Future<Object>> ret = pool.invokeAll(tasks); + for( Future<Object> tret : ret ) + tret.get(); //check for errors + pool.shutdown(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + // post-processing + out.recomputeNonZeros(); + } + + if( LOG.isDebugEnabled() ) + LOG.debug("Compressed TSMM k="+k+" in "+time.stop()); + + return out; + } + + + /** + * Multiply this matrix block by a column vector on the right. + * + * @param vector + * right-hand operand of the multiplication + * @param result + * buffer to hold the result; must have the appropriate size + * already + */ + private void rightMultByVector(MatrixBlock vector, MatrixBlock result) + throws DMLRuntimeException + { + // initialize and allocate the result + result.allocateDenseBlock(); + + // delegate matrix-vector operation to each column group + for( ColGroup grp : _colGroups ) + if( grp instanceof ColGroupUncompressed ) //overwrites output + grp.rightMultByVector(vector, result, 0, result.getNumRows()); + for( ColGroup grp : _colGroups ) + if( !(grp instanceof ColGroupUncompressed) ) //adds to output + grp.rightMultByVector(vector, result, 0, result.getNumRows()); + + // post-processing + result.recomputeNonZeros(); + } + + /** + * Multi-threaded version of rightMultByVector. + * + * @param vector + * @param result + * @param k + * @throws DMLRuntimeException + */ + private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k) + throws DMLRuntimeException + { + // initialize and allocate the result + result.allocateDenseBlock(); + + //multi-threaded execution of all groups + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + int rlen = getNumRows(); + int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ; + int blklen = (int)(Math.ceil((double)rlen/k)); + blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0; + ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>(); + for( int i=0; i<k & i*blklen<getNumRows(); i++ ) + tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen))); + pool.invokeAll(tasks); + pool.shutdown(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + // post-processing + result.recomputeNonZeros(); + } + + /** + * Multiply this matrix block by the transpose of a column vector (i.e. + * t(v)%*%X) + * + * @param vector + * left-hand operand of the multiplication + * @param result + * buffer to hold the result; must have the appropriate size + * already + */ + private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose) + throws DMLRuntimeException + { + //transpose vector if required + MatrixBlock rowVector = vector; + if (doTranspose) { + rowVector = new MatrixBlock(1, vector.getNumRows(), false); + LibMatrixReorg.transpose(vector, rowVector); + } + + // initialize and allocate the result + result.reset(); + result.allocateDenseBlock(); + + // delegate matrix-vector operation to each column group + for (ColGroup grp : colGroups) { + grp.leftMultByRowVector(rowVector, result); + } + + // post-processing + result.recomputeNonZeros(); + } + + /** + * Multi-thread version of leftMultByVectorTranspose. + * + * @param vector + * @param result + * @param doTranspose + * @param k + * @throws DMLRuntimeException + */ + private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) + throws DMLRuntimeException + { + int kuc = Math.max(1, k - colGroups.size() + 1); + + //transpose vector if required + MatrixBlock rowVector = vector; + if (doTranspose) { + rowVector = new MatrixBlock(1, vector.getNumRows(), false); + LibMatrixReorg.transpose(vector, rowVector); + } + + // initialize and allocate the result + result.reset(); + result.allocateDenseBlock(); + + //multi-threaded execution + try { + ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) ); + ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>(); + for( ColGroup grp : colGroups ) + tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc)); + pool.invokeAll(tasks); + pool.shutdown(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + // post-processing + result.recomputeNonZeros(); + } + + /** + * + * @param result + * @throws DMLRuntimeException + */ + private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int cl, int cu) + throws DMLRuntimeException + { + final int numRows = groups.get(0).getNumRows(); + final int numGroups = groups.size(); + + //preallocated dense matrix block + MatrixBlock lhs = new MatrixBlock(numRows, 1, false); + lhs.allocateDenseBlock(); + + //approach: for each colgroup, extract uncompressed columns one at-a-time + //vector-matrix multiplies against remaining col groups + for( int i=cl; i<cu; i++ ) + { + //get current group and relevant col groups + ColGroup group = groups.get(i); + int[] ixgroup = group.getColIndices(); + List<ColGroup> tmpList = groups.subList(i, numGroups); + + //for all uncompressed lhs columns vectors + for( int j=0; j<ixgroup.length; j++ ) { + //decompress single column + lhs.reset(numRows, 1, false); + group.decompressToBlock(lhs, j); + + if( !lhs.isEmptyBlock(false) ) { + //compute vector-matrix partial result + MatrixBlock tmpret = new MatrixBlock(1,result.getNumColumns(),false); + leftMultByVectorTranspose(tmpList, lhs, tmpret, true); + + //write partial results (disjoint non-zeros) + LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]); + } + } + } + } + + /** + * + * @param k + * @return + */ + @SuppressWarnings("unchecked") + private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed) + { + // special case: single uncompressed col group + if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){ + return new ArrayList[0]; + } + + // initialize round robin col group distribution + // (static task partitioning to reduce mem requirements/final agg) + int numTasks = Math.min(k, _colGroups.size()); + ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks]; + int pos = 0; + for( ColGroup grp : _colGroups ){ + if( grpParts[pos]==null ) + grpParts[pos] = new ArrayList<ColGroup>(); + if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) { + grpParts[pos].add(grp); + pos = (pos==numTasks-1) ? 0 : pos+1; + } + } + + return grpParts; + } + + /** + * + * @return + */ + private ColGroupUncompressed getUncompressedColGroup() + { + for( ColGroup grp : _colGroups ) + if( grp instanceof ColGroupUncompressed ) + return (ColGroupUncompressed)grp; + + return null; + } + + /** + * + */ + private static class LeftMatrixMultTask implements Callable<Object> + { + private ColGroup _group = null; + private MatrixBlock _vect = null; + private MatrixBlock _ret = null; + private int _kuc = 1; + + protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) { + _group = group; + _vect = vect; + _ret = ret; + _kuc = kuc; + } + + @Override + public Object call() throws DMLRuntimeException + { + // delegate matrix-vector operation to each column group + if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT ) + ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc); + else + _group.leftMultByRowVector(_vect, _ret); + return null; + } + } + + /** + * + */ + private static class RightMatrixMultTask implements Callable<Object> + { + private ArrayList<ColGroup> _groups = null; + private MatrixBlock _vect = null; + private MatrixBlock _ret = null; + private int _rl = -1; + private int _ru = -1; + + protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) { + _groups = groups; + _vect = vect; + _ret = ret; + _rl = rl; + _ru = ru; + } + + @Override + public Object call() throws DMLRuntimeException + { + // delegate vector-matrix operation to each column group + for( ColGroup grp : _groups ) + if( grp instanceof ColGroupUncompressed ) //overwrites output + grp.rightMultByVector(_vect, _ret, _rl, _ru); + for( ColGroup grp : _groups ) + if( !(grp instanceof ColGroupUncompressed) ) //adds to output + grp.rightMultByVector(_vect, _ret, _rl, _ru); + return null; + } + } + + private static class MatrixMultTransposeTask implements Callable<Object> + { + private ArrayList<ColGroup> _groups = null; + private MatrixBlock _ret = null; + private int _cl = -1; + private int _cu = -1; + + protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int cl, int cu) { + _groups = groups; + _ret = ret; + _cl = cl; + _cu = cu; + } + + @Override + public Object call() throws DMLRuntimeException { + leftMultByTransposeSelf(_groups, _ret, _cl, _cu); + return null; + } + } + + private static class UnaryAggregateTask implements Callable<Object> + { + private ArrayList<ColGroup> _groups = null; + private MatrixBlock _ret = null; + private AggregateUnaryOperator _op = null; + + protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, AggregateUnaryOperator op) { + _groups = groups; + _op = op; + + if( !(_op.indexFn instanceof ReduceRow) ) { //sum/rowSums + _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false); + _ret.allocateDenseBlock(); + } + else { //colSums + _ret = ret; + } + } + + @Override + public Object call() throws DMLRuntimeException + { + // delegate vector-matrix operation to each column group + for( ColGroup grp : _groups ) + grp.unaryAggregateOperations(_op, _ret); + return null; + } + + public MatrixBlock getResult(){ + return _ret; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java new file mode 100644 index 0000000..a76223c --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.math3.random.RandomDataGenerator; + +/** + * Used for the finding columns to co-code + * + */ +public class PlanningBinPacker +{ + private final float _binWeight; + private final List<Integer> _items; + private final List<Float> _itemWeights; + + public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) { + _binWeight = binWeight; + _items = items; + _itemWeights = itemWeights; + } + + /** + * NOTE: upper bound is 17/10 OPT + * + * @return key: available space, value: list of the bins that have that free space + */ + public TreeMap<Float, List<List<Integer>>> packFirstFit() { + return packFirstFit(_items, _itemWeights); + } + + /** + * shuffling the items to make some potential for having bins of different + * sizes when consecutive columns are of close cardinalities + * + * @return key: available space, value: list of the bins that have that free + * space + */ + public TreeMap<Float, List<List<Integer>>> packFirstFitShuffled() { + RandomDataGenerator rnd = new RandomDataGenerator(); + int[] permutation = rnd.nextPermutation(_items.size(), _items.size()); + List<Integer> shuffledItems = new ArrayList<Integer>(_items.size()); + List<Float> shuffledWeights = new ArrayList<Float>(_items.size()); + for (int ix : permutation) { + shuffledItems.add(_items.get(ix)); + shuffledWeights.add(_itemWeights.get(ix)); + } + + return packFirstFit(shuffledItems, shuffledWeights); + } + + /** + * + * @param items + * @param itemWeights + * @return + */ + private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights) + { + // when searching for a bin, the first bin in the list is used + TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>(); + // first bin + bins.put(_binWeight, createBinList()); + int numItems = items.size(); + for (int i = 0; i < numItems; i++) { + float itemWeight = itemWeights.get(i); + Map.Entry<Float, List<List<Integer>>> entry = bins + .ceilingEntry(itemWeight); + if (entry == null) { + // new bin + float newBinWeight = _binWeight - itemWeight; + List<List<Integer>> binList = bins.get(newBinWeight); + if (binList == null) { + bins.put(newBinWeight, createBinList(items.get(i))); + } else { + List<Integer> newBin = new ArrayList<Integer>(); + newBin.add(items.get(i)); + binList.add(newBin); + } + } else { + // add to the first bin in the list + List<Integer> assignedBin = entry.getValue().remove(0); + assignedBin.add(items.get(i)); + if (entry.getValue().size() == 0) + bins.remove(entry.getKey()); + float newBinWeight = entry.getKey() - itemWeight; + List<List<Integer>> newBinsList = bins.get(newBinWeight); + if (newBinsList == null) { + // new bin + bins.put(newBinWeight, createBinList(assignedBin)); + } else { + newBinsList.add(assignedBin); + } + } + } + return bins; + } + + /** + * NOTE: upper bound is 11/9 OPT + 6/9 (~1.22 OPT) + * + * @return + */ + public TreeMap<Float, List<List<Integer>>> packFirstFitDescending() { + // sort items descending based on their weights + Integer[] indexes = new Integer[_items.size()]; + for (int i = 0; i < indexes.length; i++) + indexes[i] = i; + Arrays.sort(indexes, new Comparator<Integer>() { + + @Override + public int compare(Integer o1, Integer o2) { + return _itemWeights.get(o1).compareTo(_itemWeights.get(o2)); + } + }); + List<Integer> sortedItems = new ArrayList<Integer>(); + List<Float> sortedItemWeights = new ArrayList<Float>(); + for (int i = indexes.length - 1; i >= 0; i--) { + sortedItems.add(_items.get(i)); + sortedItemWeights.add(_itemWeights.get(i)); + } + return packFirstFit(sortedItems, sortedItemWeights); + } + + /** + * NOTE: upper bound is 71/60 OPT + 6/9 (~1.18 OPT) + * + * @return + */ + public TreeMap<Float, List<List<Integer>>> packModifiedFirstFitDescending() { + throw new UnsupportedOperationException("Not implemented yet!"); + } + + /** + * + * @return + */ + private List<List<Integer>> createBinList() { + List<List<Integer>> binList = new ArrayList<List<Integer>>(); + binList.add(new ArrayList<Integer>()); + return binList; + } + + /** + * + * @param item + * @return + */ + private List<List<Integer>> createBinList(int item) { + List<List<Integer>> binList = new ArrayList<List<Integer>>(); + List<Integer> bin = new ArrayList<Integer>(); + binList.add(bin); + bin.add(item); + return binList; + } + + /** + * + * @param bin + * @return + */ + private List<List<Integer>> createBinList(List<Integer> bin) { + List<List<Integer>> binList = new ArrayList<List<Integer>>(); + binList.add(bin); + return binList; + } +}
