[SYSTEMML-449] Compressed linear algebra v2 This patch bundles various improvements for the experimental feature 'compressed linear algebra'. In detail, this includes the following extensions:
* [SYSTEMML-820] New column encoding format DDC (dense dictionary coding) with DDC1 and DDC2 for 1 and 2 byte codes as well as efficient operations. * [SYSTEMML-815] Hardened sample-based estimators (e.g., uncompressed size, empty segments, reduced population size, and stabilization parameter as well as numerically stable implementations), incl increased sample fraction and removed unnecessary parameters. * [SYSTEMML-814] Debugging tools for compression plans, compression tracing, and compression statistics. * New greedy column grouping algorithm with pruning and memoization. * New static column partitioning and changed bin packing heuristics. * Additional operations (e.g., cache-conscious rowSums) * Various fixes and performance improvements throughout all CLA components. * Extended test cases to cover OLE, RLE, DDC, and UC groups as well as combinations thereof. * Various internal refactorings to simplify the extension and maintenance of CLA. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/37a215bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/37a215bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/37a215bc Branch: refs/heads/master Commit: 37a215bc3be26495c351eae6be4b85eaf22daedc Parents: 390b81c Author: Matthias Boehm <[email protected]> Authored: Sun Feb 5 16:22:01 2017 +0100 Committer: Matthias Boehm <[email protected]> Committed: Wed Feb 8 03:12:18 2017 +0100 ---------------------------------------------------------------------- .../sysml/runtime/compress/BitmapEncoder.java | 22 +- .../apache/sysml/runtime/compress/ColGroup.java | 39 +- .../sysml/runtime/compress/ColGroupBitmap.java | 580 ------------------ .../sysml/runtime/compress/ColGroupDDC.java | 227 +++++++ .../sysml/runtime/compress/ColGroupDDC1.java | 358 +++++++++++ .../sysml/runtime/compress/ColGroupDDC2.java | 312 ++++++++++ .../sysml/runtime/compress/ColGroupOLE.java | 173 +++--- .../sysml/runtime/compress/ColGroupOffset.java | 424 +++++++++++++ .../sysml/runtime/compress/ColGroupRLE.java | 178 +++--- .../runtime/compress/ColGroupUncompressed.java | 41 +- .../sysml/runtime/compress/ColGroupValue.java | 303 ++++++++++ .../runtime/compress/CompressedMatrixBlock.java | 419 ++++++++----- .../runtime/compress/PlanningBinPacker.java | 112 ---- .../sysml/runtime/compress/PlanningCoCoder.java | 257 -------- .../runtime/compress/PlanningCoCodingGroup.java | 110 ---- .../compress/PlanningGroupMergeAction.java | 73 --- .../compress/ReaderColumnSelectionSparse.java | 1 - .../runtime/compress/UncompressedBitmap.java | 73 ++- .../compress/cocode/ColumnGroupPartitioner.java | 19 + .../ColumnGroupPartitionerBinPacking.java | 100 +++ .../cocode/ColumnGroupPartitionerStatic.java | 52 ++ .../compress/cocode/PlanningCoCoder.java | 236 ++++++++ .../compress/cocode/PlanningCoCodingGroup.java | 175 ++++++ .../compress/cocode/PlanningMemoTable.java | 75 +++ .../compress/estim/CompressedSizeEstimator.java | 47 +- .../estim/CompressedSizeEstimatorExact.java | 5 +- .../estim/CompressedSizeEstimatorSample.java | 605 ++++++++++--------- .../compress/estim/CompressedSizeInfo.java | 46 +- .../compress/estim/SizeEstimatorFactory.java | 6 +- .../runtime/compress/utils/ConverterUtils.java | 16 + .../runtime/compress/utils/IntArrayList.java | 13 +- .../compress/utils/LinearAlgebraUtils.java | 164 +++++ .../compress/BasicCompressionTest.java | 40 +- .../functions/compress/BasicGetValueTest.java | 40 +- .../compress/BasicMatrixAppendTest.java | 40 +- .../compress/BasicMatrixMultChainTest.java | 76 ++- .../BasicMatrixTransposeSelfMultTest.java | 40 +- .../compress/BasicMatrixVectorMultTest.java | 40 +- .../BasicScalarOperationsSparseUnsafeTest.java | 40 +- .../compress/BasicScalarOperationsTest.java | 40 +- .../BasicTransposeSelfLeftMatrixMultTest.java | 40 +- .../compress/BasicUnaryAggregateTest.java | 326 +++++++--- .../compress/BasicVectorMatrixMultTest.java | 40 +- .../functions/compress/CompressedLinregCG.java | 5 +- .../compress/CompressedSerializationTest.java | 40 +- .../compress/LargeCompressionTest.java | 40 +- .../compress/LargeMatrixVectorMultTest.java | 40 +- .../compress/LargeParMatrixVectorMultTest.java | 40 +- .../compress/LargeParUnaryAggregateTest.java | 337 +++++++---- .../compress/LargeVectorMatrixMultTest.java | 40 +- .../functions/compress/ParCompressionTest.java | 40 +- .../compress/ParMatrixMultChainTest.java | 66 +- .../compress/ParMatrixVectorMultTest.java | 40 +- .../ParTransposeSelfLeftMatrixMultTest.java | 40 +- .../compress/ParUnaryAggregateTest.java | 327 ++++++---- .../compress/ParVectorMatrixMultTest.java | 40 +- 56 files changed, 4733 insertions(+), 2385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java index 7fd2c69..b27112f 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java +++ b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java @@ -20,7 +20,6 @@ package org.apache.sysml.runtime.compress; import java.util.ArrayList; -import java.util.Arrays; import org.apache.sysml.runtime.compress.utils.DblArray; import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap; @@ -100,8 +99,8 @@ public class BitmapEncoder * the offsets of different bits * @return compressed version of said bitmap */ - public static char[] genRLEBitmap(int[] offsets) { - if( offsets.length == 0 ) + public static char[] genRLEBitmap(int[] offsets, int len) { + if( len == 0 ) return new char[0]; //empty list // Use an ArrayList for correctness at the expense of temp space @@ -139,7 +138,7 @@ public class BitmapEncoder curRunLen = 1; // Process the remaining offsets - for (int i = 1; i < offsets.length; i++) { + for (int i = 1; i < len; i++) { int absOffset = offsets[i]; @@ -179,9 +178,8 @@ public class BitmapEncoder // Convert wasteful ArrayList to packed array. char[] ret = new char[buf.size()]; - for (int i = 0; i < buf.size(); i++) { + for(int i = 0; i < buf.size(); i++ ) ret[i] = buf.get(i); - } return ret; } @@ -194,21 +192,19 @@ public class BitmapEncoder * the offsets of different bits * @return compressed version of said bitmap */ - public static char[] genOffsetBitmap(int[] offsets) - { - int lastOffset = offsets[offsets.length - 1]; + public static char[] genOffsetBitmap(int[] offsets, int len) + { + int lastOffset = offsets[len - 1]; // Build up the blocks int numBlocks = (lastOffset / BITMAP_BLOCK_SZ) + 1; // To simplify the logic, we make two passes. // The first pass divides the offsets by block. int[] blockLengths = new int[numBlocks]; - Arrays.fill(blockLengths, 0); - for (int ix = 0; ix < offsets.length; ix++) { + for (int ix = 0; ix < len; ix++) { int val = offsets[ix]; int blockForVal = val / BITMAP_BLOCK_SZ; - blockLengths[blockForVal]++; } @@ -238,7 +234,7 @@ public class BitmapEncoder return encodedBlocks; } - + private static UncompressedBitmap extractBitmap(int colIndex, MatrixBlock rawblock, boolean skipZeros) { //probe map for distinct items (for value or value groups) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java index 586690c..bf1b822 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java @@ -40,9 +40,11 @@ public abstract class ColGroup implements Serializable private static final long serialVersionUID = 2439785418908671481L; public enum CompressionType { - UNCOMPRESSED, //uncompressed sparse/dense - RLE_BITMAP, //RLE bitmap - OLE_BITMAP; //OLE bitmap + UNCOMPRESSED, //uncompressed sparse/dense + RLE_BITMAP, //RLE bitmap + OLE_BITMAP, //OLE bitmap + DDC1, //DDC 1 byte + DDC2; //DDC 2 byte } /** @@ -53,23 +55,17 @@ public abstract class ColGroup implements Serializable /** Number of rows in the matrix, for use by child classes. */ protected int _numRows; - - /** How the elements of the column group are compressed. */ - private CompressionType _compType; - /** * Main constructor. * - * @param type compression type * @param colIndices * offsets of the columns in the matrix block that make up the * group * @param numRows * total number of rows in the parent block */ - protected ColGroup(CompressionType type, int[] colIndices, int numRows) { - _compType = type; + protected ColGroup(int[] colIndices, int numRows) { _colIndexes = colIndices; _numRows = numRows; } @@ -77,16 +73,15 @@ public abstract class ColGroup implements Serializable /** * Convenience constructor for converting indices to a more compact format. * - * @param type compression type * @param colIndicesList list of column indices * @param numRows total number of rows in the parent block */ - protected ColGroup(CompressionType type, List<Integer> colIndicesList, int numRows) { - _compType = type; + protected ColGroup(List<Integer> colIndicesList, int numRows) { _colIndexes = new int[colIndicesList.size()]; int i = 0; for (Integer index : colIndicesList) _colIndexes[i++] = index; + _numRows = numRows; } /** @@ -126,9 +121,7 @@ public abstract class ColGroup implements Serializable * * @return How the elements of the column group are compressed. */ - public CompressionType getCompType() { - return _compType; - } + public abstract CompressionType getCompType(); public void shiftColIndices(int offset) { for( int i=0; i<_colIndexes.length; i++ ) @@ -143,14 +136,12 @@ public abstract class ColGroup implements Serializable * in memory. */ public long estimateInMemorySize() { - // int numRows (4B) , array reference colIndices (8B) + array object - // overhead if exists (32B) + 4B per element, CompressionType compType - // (2 booleans 2B + enum overhead 32B + reference to enum 8B) - long size = 54; - if (_colIndexes == null) - return size; - else - return size + 32 + 4 * _colIndexes.length; + // object (12B padded to factors of 8), int numRows (4B), + // array reference colIndices (8B) + //+ array object overhead if exists (32B) + 4B per element + long size = 24; + return (_colIndexes == null) ? size : + size + 32 + 4 * _colIndexes.length; } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java deleted file mode 100644 index dac18ef..0000000 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.compress; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.functionobjects.Builtin; -import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; -import org.apache.sysml.runtime.matrix.operators.ScalarOperator; - - -/** - * Base class for column groups encoded with various types of bitmap encoding. - * - * - * NOTES: - * * OLE: separate storage segment length and bitmaps led to a 30% improvement - * but not applied because more difficult to support both data layouts at the - * same time (distributed/local as well as w/ and w/o low-level opt) - */ -public abstract class ColGroupBitmap extends ColGroup -{ - private static final long serialVersionUID = -1635828933479403125L; - - public static final boolean LOW_LEVEL_OPT = true; - //sorting of values by physical length helps by 10-20%, especially for serial, while - //slight performance decrease for parallel incl multi-threaded, hence not applied for - //distributed operations (also because compression time + garbage collection increases) - private static final boolean SORT_VALUES_BY_LENGTH = true; - protected static final boolean CREATE_SKIPLIST = true; - - protected static final int READ_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ; - protected static final int WRITE_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ; - - /** Distinct values associated with individual bitmaps. */ - protected double[] _values; //linearized <numcol vals> <numcol vals> - - /** Bitmaps, one per uncompressed value in {@link #_values}. */ - protected int[] _ptr; //bitmap offsets per value - protected char[] _data; //linearized bitmaps (variable length) - protected boolean _zeros; //contains zero values - - protected int[] _skiplist; - - public ColGroupBitmap(CompressionType type) { - super(type, (int[]) null, -1); - } - - /** - * Main constructor. Stores the headers for the individual bitmaps. - * - * @param type column type - * @param colIndices - * indices (within the block) of the columns included in this - * column - * @param numRows - * total number of rows in the parent block - * @param ubm - * Uncompressed bitmap representation of the block - */ - public ColGroupBitmap(CompressionType type, int[] colIndices, int numRows, UncompressedBitmap ubm) - { - super(type, colIndices, numRows); - - // Extract and store just the distinct values. The bitmaps themselves go - // into the subclasses. - final int numCols = ubm.getNumColumns(); - final int numVals = ubm.getNumValues(); - - _values = new double[numVals*numCols]; - _zeros = (ubm.getNumOffsets() < numRows); - - for (int i=0; i<numVals; i++) { - //note: deep copied internally on getValues - double[] tmp = ubm.getValues(i); - System.arraycopy(tmp, 0, _values, i*numCols, numCols); - } - } - - /** - * Constructor for subclass methods that need to create shallow copies - * - * @param type compression type - * @param colIndices - * raw column index information - * @param numRows - * number of rows in the block - * @param zeros ? - * @param values - * set of distinct values for the block (associated bitmaps are - * kept in the subclass) - */ - protected ColGroupBitmap(CompressionType type, int[] colIndices, int numRows, boolean zeros, double[] values) { - super(type, colIndices, numRows); - _zeros = zeros; - _values = values; - } - - protected final int len(int k) { - return _ptr[k+1] - _ptr[k]; - } - - protected void createCompressedBitmaps(int numVals, int totalLen, char[][] lbitmaps) - { - // compact bitmaps to linearized representation - if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH - && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) - { - // sort value by num segments in descending order - TreeMap<Integer,ArrayList<Integer>> tree = new TreeMap<Integer, ArrayList<Integer>>(); - for( int i=0; i<numVals; i++ ) { - int revlen = totalLen-lbitmaps[i].length; - if( !tree.containsKey(revlen) ) - tree.put(revlen, new ArrayList<Integer>()); - tree.get(revlen).add(i); - } - - // compact bitmaps to linearized representation - _ptr = new int[numVals+1]; - _data = new char[totalLen]; - int pos = 0, off = 0; - for( Entry<Integer,ArrayList<Integer>> e : tree.entrySet() ) { - for( Integer tmpix : e.getValue() ) { - int len = lbitmaps[tmpix].length; - _ptr[pos] = off; - System.arraycopy(lbitmaps[tmpix], 0, _data, off, len); - off += len; - pos++; - } - } - _ptr[numVals] = totalLen; - - // reorder values - double[] lvalues = new double[_values.length]; - int off2 = 0; int numCols = _colIndexes.length; - for( Entry<Integer,ArrayList<Integer>> e : tree.entrySet() ) { - for( Integer tmpix : e.getValue() ) { - System.arraycopy(_values, tmpix*numCols, lvalues, off2, numCols); - off2 += numCols; - } - } - _values = lvalues; - } - else - { - // compact bitmaps to linearized representation - _ptr = new int[numVals+1]; - _data = new char[totalLen]; - for( int i=0, off=0; i<numVals; i++ ) { - int len = lbitmaps[i].length; - _ptr[i] = off; - System.arraycopy(lbitmaps[i], 0, _data, off, len); - off += len; - } - _ptr[numVals] = totalLen; - } - } - - @Override - public long estimateInMemorySize() { - long size = super.estimateInMemorySize(); - - // adding the size of values - size += 8; //array reference - if (_values != null) { - size += 32 + _values.length * 8; //values - } - - // adding bitmaps size - size += 16; //array references - if (_data != null) { - size += 32 + _ptr.length * 4; // offsets - size += 32 + _data.length * 2; // bitmaps - } - - return size; - } - - //generic decompression for OLE/RLE, to be overwritten for performance - @Override - public void decompressToBlock(MatrixBlock target, int rl, int ru) - { - final int numCols = getNumCols(); - final int numVals = getNumValues(); - int[] colIndices = getColIndices(); - - // Run through the bitmaps for this column group - for (int i = 0; i < numVals; i++) { - Iterator<Integer> decoder = getDecodeIterator(i); - int valOff = i*numCols; - - while (decoder.hasNext()) { - int row = decoder.next(); - if( row<rl ) continue; - if( row>ru ) break; - - for (int colIx = 0; colIx < numCols; colIx++) - target.appendValue(row, colIndices[colIx], _values[valOff+colIx]); - } - } - } - - //generic decompression for OLE/RLE, to be overwritten for performance - @Override - public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) - { - final int numCols = getNumCols(); - final int numVals = getNumValues(); - - // Run through the bitmaps for this column group - for (int i = 0; i < numVals; i++) { - Iterator<Integer> decoder = getDecodeIterator(i); - int valOff = i*numCols; - - while (decoder.hasNext()) { - int row = decoder.next(); - for (int colIx = 0; colIx < numCols; colIx++) { - int origMatrixColIx = getColIndex(colIx); - int targetColIx = colIndexTargets[origMatrixColIx]; - target.quickSetValue(row, targetColIx, _values[valOff+colIx]); - } - } - } - } - - //generic decompression for OLE/RLE, to be overwritten for performance - @Override - public void decompressToBlock(MatrixBlock target, int colpos) - { - final int numCols = getNumCols(); - final int numVals = getNumValues(); - - // Run through the bitmaps for this column group - for (int i = 0; i < numVals; i++) { - Iterator<Integer> decoder = getDecodeIterator(i); - int valOff = i*numCols; - - while (decoder.hasNext()) { - int row = decoder.next(); - target.quickSetValue(row, 0, _values[valOff+colpos]); - } - } - } - - //generic get for OLE/RLE, to be overwritten for performance - //potential: skip scan (segment length agg and run length) instead of decode - @Override - public double get(int r, int c) { - //find local column index - int ix = Arrays.binarySearch(_colIndexes, c); - if( ix < 0 ) - throw new RuntimeException("Column index "+c+" not in bitmap group."); - - //find row index in value offset lists via scan - final int numCols = getNumCols(); - final int numVals = getNumValues(); - for (int i = 0; i < numVals; i++) { - Iterator<Integer> decoder = getDecodeIterator(i); - int valOff = i*numCols; - while (decoder.hasNext()) { - int row = decoder.next(); - if( row == r ) - return _values[valOff+ix]; - else if( row > r ) - break; //current value - } - } - return 0; - } - - public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) - throws DMLRuntimeException; - - protected final double sumValues(int bitmapIx) - { - final int numCols = getNumCols(); - final int valOff = bitmapIx * numCols; - - double val = 0.0; - for( int i = 0; i < numCols; i++ ) { - val += _values[valOff+i]; - } - - return val; - } - - protected final double sumValues(int bitmapIx, double[] b) - { - final int numCols = getNumCols(); - final int valOff = bitmapIx * numCols; - - double val = 0; - for( int i = 0; i < numCols; i++ ) { - val += _values[valOff+i] * b[i]; - } - - return val; - } - - protected final double mxxValues(int bitmapIx, Builtin builtin) - { - final int numCols = getNumCols(); - final int valOff = bitmapIx * numCols; - - double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1); - for( int i = 0; i < numCols; i++ ) - val = builtin.execute2(val, _values[valOff+i]); - - return val; - } - - protected final double[] preaggValues(int numVals, double[] b) { - double[] ret = new double[numVals]; - for( int k = 0; k < numVals; k++ ) - ret[k] = sumValues(k, b); - - return ret; - } - - /** - * Method for use by subclasses. Applies a scalar operation to the value - * metadata stored in the superclass. - * - * @param op - * scalar operation to perform - * @return transformed copy of value metadata for this column group - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - protected double[] applyScalarOp(ScalarOperator op) - throws DMLRuntimeException - { - //scan over linearized values - double[] ret = new double[_values.length]; - for (int i = 0; i < _values.length; i++) { - ret[i] = op.executeScalar(_values[i]); - } - - return ret; - } - - protected double[] applyScalarOp(ScalarOperator op, double newVal, int numCols) - throws DMLRuntimeException - { - //scan over linearized values - double[] ret = new double[_values.length + numCols]; - for( int i = 0; i < _values.length; i++ ) { - ret[i] = op.executeScalar(_values[i]); - } - - //add new value to the end - Arrays.fill(ret, _values.length, _values.length+numCols, newVal); - - return ret; - } - - /** - * NOTE: Shared across OLE/RLE because value-only computation. - * - * @param result matrix block - * @param builtin ? - */ - protected void computeMxx(MatrixBlock result, Builtin builtin) - { - //init and 0-value handling - double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1); - if( _zeros ) - val = builtin.execute2(val, 0); - - //iterate over all values only - final int numVals = getNumValues(); - final int numCols = getNumCols(); - for (int k = 0; k < numVals; k++) - for( int j=0, valOff = k*numCols; j<numCols; j++ ) - val = builtin.execute2(val, _values[ valOff+j ]); - - //compute new partial aggregate - val = builtin.execute2(val, result.quickGetValue(0, 0)); - result.quickSetValue(0, 0, val); - } - - /** - * NOTE: Shared across OLE/RLE because value-only computation. - * - * @param result matrix block - * @param builtin ? - */ - protected void computeColMxx(MatrixBlock result, Builtin builtin) - { - final int numVals = getNumValues(); - final int numCols = getNumCols(); - - //init and 0-value handling - double[] vals = new double[numCols]; - Arrays.fill(vals, Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1)); - if( _zeros ) { - for( int j = 0; j < numCols; j++ ) - vals[j] = builtin.execute2(vals[j], 0); - } - - //iterate over all values only - for (int k = 0; k < numVals; k++) - for( int j=0, valOff=k*numCols; j<numCols; j++ ) - vals[j] = builtin.execute2(vals[j], _values[ valOff+j ]); - - //copy results to output - for( int j=0; j<numCols; j++ ) - result.quickSetValue(0, _colIndexes[j], vals[j]); - } - - - /** - * Obtain number of distrinct sets of values associated with the bitmaps in this column group. - * - * @return the number of distinct sets of values associated with the bitmaps - * in this column group - */ - public int getNumValues() { - return _values.length / _colIndexes.length; - } - - public double[] getValues() { - return _values; - } - - public char[] getBitmaps() { - return _data; - } - - public int[] getBitmapOffsets() { - return _ptr; - } - - public boolean hasZeros() { - return _zeros; - } - - /** - * @param k - * index of a specific compressed bitmap (stored in subclass, - * index same as {@link #getValues}) - * @return an object for iterating over the row offsets in this bitmap. Only - * valid until the next call to this method. May be reused across - * calls. - */ - public abstract Iterator<Integer> getDecodeIterator(int k); - - //TODO getDecodeIterator(int k, int rl, int ru) - - /** - * Utility function of sparse-unsafe operations. - * - * @param ind ? - * @return offsets - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - protected int[] computeOffsets(boolean[] ind) - throws DMLRuntimeException - { - //determine number of offsets - int numOffsets = 0; - for( int i=0; i<ind.length; i++ ) - numOffsets += ind[i] ? 1 : 0; - - //create offset lists - int[] ret = new int[numOffsets]; - for( int i=0, pos=0; i<ind.length; i++ ) - if( ind[i] ) - ret[pos++] = i; - - return ret; - } - - @Override - public void readFields(DataInput in) - throws IOException - { - _numRows = in.readInt(); - int numCols = in.readInt(); - int numVals = in.readInt(); - _zeros = in.readBoolean(); - - //read col indices - _colIndexes = new int[ numCols ]; - for( int i=0; i<numCols; i++ ) - _colIndexes[i] = in.readInt(); - - //read distinct values - _values = new double[numVals*numCols]; - for( int i=0; i<numVals*numCols; i++ ) - _values[i] = in.readDouble(); - - //read bitmaps - int totalLen = in.readInt(); - _ptr = new int[numVals+1]; - _data = new char[totalLen]; - for( int i=0, off=0; i<numVals; i++ ) { - int len = in.readInt(); - _ptr[i] = off; - for( int j=0; j<len; j++ ) - _data[off+j] = in.readChar(); - off += len; - } - _ptr[numVals] = totalLen; - } - - @Override - public void write(DataOutput out) - throws IOException - { - int numCols = getNumCols(); - int numVals = getNumValues(); - out.writeInt(_numRows); - out.writeInt(numCols); - out.writeInt(numVals); - out.writeBoolean(_zeros); - - //write col indices - for( int i=0; i<_colIndexes.length; i++ ) - out.writeInt( _colIndexes[i] ); - - //write distinct values - for( int i=0; i<_values.length; i++ ) - out.writeDouble(_values[i]); - - //write bitmaps (lens and data, offset later recreated) - int totalLen = 0; - for( int i=0; i<numVals; i++ ) - totalLen += len(i); - out.writeInt(totalLen); - for( int i=0; i<numVals; i++ ) { - int len = len(i); - int off = _ptr[i]; - out.writeInt(len); - for( int j=0; j<len; j++ ) - out.writeChar(_data[off+j]); - } - } - - @Override - public long getExactSizeOnDisk() { - long ret = 13; //header - //col indices - ret += 4 * _colIndexes.length; - //distinct values (groups of values) - ret += 8 * _values.length; - //actual bitmaps - ret += 4; //total length - for( int i=0; i<getNumValues(); i++ ) - ret += 4 + 2 * len(i); - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java new file mode 100644 index 0000000..1782e2e --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.util.Arrays; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.functionobjects.Builtin; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.functionobjects.KahanPlusSq; +import org.apache.sysml.runtime.functionobjects.ReduceAll; +import org.apache.sysml.runtime.functionobjects.ReduceCol; +import org.apache.sysml.runtime.functionobjects.ReduceRow; +import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; + +/** + * Class to encapsulate information about a column group that is encoded with + * dense dictionary encoding (DDC). + * + * NOTE: zero values are included at position 0 in the value dictionary, which + * simplifies various operations such as counting the number of non-zeros. + */ +public abstract class ColGroupDDC extends ColGroupValue +{ + private static final long serialVersionUID = -3204391646123465004L; + + public ColGroupDDC() { + super(); + } + + public ColGroupDDC(int[] colIndices, int numRows, UncompressedBitmap ubm) { + super(colIndices, numRows, ubm); + } + + protected ColGroupDDC(int[] colIndices, int numRows, double[] values) { + super(colIndices, numRows, values); + } + + @Override + public void decompressToBlock(MatrixBlock target, int rl, int ru) { + for( int i = rl; i < ru; i++ ) { + for( int colIx = 0; colIx < _colIndexes.length; colIx++ ) { + int col = _colIndexes[colIx]; + double cellVal = getData(i, colIx); + target.quickSetValue(i, col, cellVal); + } + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) { + int nrow = getNumRows(); + int ncol = getNumCols(); + for( int i = 0; i < nrow; i++ ) { + for( int colIx = 0; colIx < ncol; colIx++ ) { + int origMatrixColIx = getColIndex(colIx); + int col = colIndexTargets[origMatrixColIx]; + double cellVal = getData(i, colIx); + target.quickSetValue(i, col, cellVal); + } + } + } + + @Override + public void decompressToBlock(MatrixBlock target, int colpos) { + int nrow = getNumRows(); + for( int i = 0; i < nrow; i++ ) { + double cellVal = getData(i, colpos); + target.quickSetValue(i, 0, cellVal); + } + } + + @Override + public double get(int r, int c) { + //find local column index + int ix = Arrays.binarySearch(_colIndexes, c); + if( ix < 0 ) + throw new RuntimeException("Column index "+c+" not in DDC group."); + + //get value + return getData(r, ix); + } + + + @Override + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { + int ncol = getNumCols(); + for( int i = rl; i < ru; i++ ) { + int lnnz = 0; + for( int colIx=0; colIx < ncol; colIx++ ) + lnnz += (getData(i, colIx) != 0) ? 1 : 0; + rnnz[i-rl] += lnnz; + } + } + + @Override + public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + //sum and sumsq (reduceall/reducerow over tuples and counts) + if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) + { + KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? + KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); + + if( op.indexFn instanceof ReduceAll ) + computeSum(result, kplus); + else if( op.indexFn instanceof ReduceCol ) + computeRowSums(result, kplus, rl, ru); + else if( op.indexFn instanceof ReduceRow ) + computeColSums(result, kplus); + } + //min and max (reduceall/reducerow over tuples only) + else if(op.aggOp.increOp.fn instanceof Builtin + && (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX + || ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) + { + Builtin builtin = (Builtin) op.aggOp.increOp.fn; + + if( op.indexFn instanceof ReduceAll ) + computeMxx(result, builtin, false); + else if( op.indexFn instanceof ReduceCol ) + computeRowMxx(result, builtin, rl, ru); + else if( op.indexFn instanceof ReduceRow ) + computeColMxx(result, builtin, false); + } + } + + protected void computeSum(MatrixBlock result, KahanFunction kplus) { + int nrow = getNumRows(); + int ncol = getNumCols(); + KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); + + for( int i=0; i<nrow; i++ ) + for( int j=0; j<ncol; j++ ) + kplus.execute2(kbuff, getData(i, j)); + + result.quickSetValue(0, 0, kbuff._sum); + result.quickSetValue(0, 1, kbuff._correction); + } + + protected void computeColSums(MatrixBlock result, KahanFunction kplus) { + int nrow = getNumRows(); + int ncol = getNumCols(); + KahanObject[] kbuff = new KahanObject[getNumCols()]; + for( int j=0; j<ncol; j++ ) + kbuff[j] = new KahanObject(result.quickGetValue(0, _colIndexes[j]), + result.quickGetValue(1, _colIndexes[j])); + + for( int i=0; i<nrow; i++ ) + for( int j=0; j<ncol; j++ ) + kplus.execute2(kbuff[j], getData(i, j)); + + for( int j=0; j<ncol; j++ ) { + result.quickSetValue(0, _colIndexes[j], kbuff[j]._sum); + result.quickSetValue(1, _colIndexes[j], kbuff[j]._correction); + } + } + + protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) { + int ncol = getNumCols(); + KahanObject kbuff = new KahanObject(0, 0); + + for( int i=rl; i<ru; i++ ) { + kbuff.set(result.quickGetValue(i, 0), result.quickGetValue(i, 1)); + for( int j=0; j<ncol; j++ ) + kplus.execute2(kbuff, getData(i, j)); + result.quickSetValue(i, 0, kbuff._sum); + result.quickSetValue(i, 1, kbuff._correction); + } + } + + protected void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) { + double[] c = result.getDenseBlock(); + int ncol = getNumCols(); + + for( int i=rl; i<ru; i++ ) + for( int j=0; j<ncol; j++ ) + c[i] = builtin.execute2(c[i], getData(i, j)); + } + + + + /** + * Generic get value for byte-length-agnostic access. + * + * @param r global row index + * @param colIx local column index + * @return value + */ + protected abstract double getData(int r, int colIx); + + /** + * Generic set value for byte-length-agnostic write + * of encoded value. + * + * @param r global row index + * @param code encoded value + */ + protected abstract void setData(int r, int code); + + @Override + public long estimateInMemorySize() { + return super.estimateInMemorySize(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java new file mode 100644 index 0000000..4db871f --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.utils.ConverterUtils; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; + +/** + * Class to encapsulate information about a column group that is encoded with + * dense dictionary encoding (DDC) using 1 byte codes. + */ +public class ColGroupDDC1 extends ColGroupDDC +{ + private static final long serialVersionUID = 5204955589230760157L; + + private byte[] _data; + + public ColGroupDDC1() { + super(); + } + + public ColGroupDDC1(int[] colIndices, int numRows, UncompressedBitmap ubm) { + super(colIndices, numRows, ubm); + _data = new byte[numRows]; + + int numVals = ubm.getNumValues(); + int numCols = ubm.getNumColumns(); + + //materialize zero values, if necessary + if( ubm.getNumOffsets() < (long)numRows * numCols ) { + int zeroIx = containsAllZeroValue(); + if( zeroIx < 0 ) { + zeroIx = numVals; + _values = Arrays.copyOf(_values, _values.length+numCols); + } + Arrays.fill(_data, (byte)zeroIx); + } + + //iterate over values and write dictionary codes + for( int i=0; i<numVals; i++ ) { + int[] tmpList = ubm.getOffsetsList(i).extractValues(); + int tmpListSize = ubm.getNumOffsets(i); + for( int k=0; k<tmpListSize; k++ ) + _data[tmpList[k]] = (byte)i; + } + } + + public ColGroupDDC1(int[] colIndices, int numRows, double[] values, byte[] data) { + super(colIndices, numRows, values); + _data = data; + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDC1; + } + + @Override + protected double getData(int r, int colIx) { + return _values[(_data[r]&0xFF)*getNumCols()+colIx]; + } + + @Override + protected void setData(int r, int code) { + _data[r] = (byte)code; + } + + @Override + public void write(DataOutput out) throws IOException { + int numCols = getNumCols(); + int numVals = getNumValues(); + out.writeInt(_numRows); + out.writeInt(numCols); + out.writeInt(numVals); + + //write col indices + for( int i=0; i<_colIndexes.length; i++ ) + out.writeInt( _colIndexes[i] ); + + //write distinct values + for( int i=0; i<_values.length; i++ ) + out.writeDouble(_values[i]); + + //write data + for( int i=0; i<_numRows; i++ ) + out.writeByte(_data[i]); + } + + @Override + public void readFields(DataInput in) throws IOException { + _numRows = in.readInt(); + int numCols = in.readInt(); + int numVals = in.readInt(); + + //read col indices + _colIndexes = new int[ numCols ]; + for( int i=0; i<numCols; i++ ) + _colIndexes[i] = in.readInt(); + + //read distinct values + _values = new double[numVals*numCols]; + for( int i=0; i<numVals*numCols; i++ ) + _values[i] = in.readDouble(); + + //read data + _data = new byte[_numRows]; + for( int i=0; i<_numRows; i++ ) + _data[i] = in.readByte(); + } + + @Override + public long getExactSizeOnDisk() { + long ret = 12; //header + //col indices + ret += 4 * _colIndexes.length; + //distinct values (groups of values) + ret += 8 * _values.length; + //data + ret += 1 * _data.length; + + return ret; + } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + + //adding data size + if (_data != null) + size += _data.length; + + return size; + } + + @Override + public void decompressToBlock(MatrixBlock target, int rl, int ru) { + int ncol = getNumCols(); + for( int i = rl; i < ru; i++ ) + for( int j=0; j<ncol; j++ ) + target.appendValue(i, _colIndexes[j], _values[(_data[i]&0xFF)*ncol+j]); + //note: append ok because final sort per row + } + + @Override + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + //pre-aggregate nnz per value tuple + int[] counts = new int[numVals]; + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) + for( int j=0; j<ncol; j++ ) + counts[k] += (_values[valOff+j]!=0) ? 1 : 0; + + //scan data and add counts to output rows + for( int i = rl; i < ru; i++ ) + rnnz[i-rl] += counts[_data[i]&0xFF]; + } + + @Override + public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + double[] b = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + //prepare reduced rhs w/ relevant values + double[] sb = new double[numCols]; + for (int j = 0; j < numCols; j++) { + sb[j] = b[_colIndexes[j]]; + } + + //pre-aggregate all distinct values (guaranteed <=255) + double[] vals = preaggValues(numVals, sb); + + //iterative over codes and add to output + for( int i=rl; i<ru; i++ ) { + c[i] += vals[_data[i]&0xFF]; + } + } + + public static void rightMultByVector(ColGroupDDC1[] grps, MatrixBlock vector, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + double[] b = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + + //prepare distinct values once + double[][] vals = new double[grps.length][]; + for( int i=0; i<grps.length; i++ ) { + //prepare reduced rhs w/ relevant values + double[] sb = new double[grps[i].getNumCols()]; + for (int j = 0; j < sb.length; j++) { + sb[j] = b[grps[i]._colIndexes[j]]; + } + //pre-aggregate all distinct values (guaranteed <=255) + vals[i] = grps[i].preaggValues(grps[i].getNumValues(), sb); + } + + //cache-conscious matrix-vector multiplication + //iterative over codes of all groups and add to output + int blksz = 2048; //16KB + for( int bi=rl; bi<ru; bi+=blksz ) + for( int j=0; j<grps.length; j++ ) + for( int i=bi; i<Math.min(bi+blksz, ru); i++ ) + c[i] += vals[j][grps[j]._data[i]&0xFF]; + } + + @Override + public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) throws DMLRuntimeException { + double[] a = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int nrow = getNumRows(); + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + if( 8*numVals < getNumRows() ) + { + //iterative over codes and pre-aggregate inputs per code (guaranteed <=255) + //temporary array also avoids false sharing in multi-threaded environments + double[] vals = new double[numVals]; + for( int i=0; i<nrow; i++ ) { + vals[_data[i]&0xFF] += a[i]; + } + + //post-scaling of pre-aggregate with distinct values + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) { + double aval = vals[k]; + for( int j=0; j<ncol; j++ ) { + int colIx = _colIndexes[j]; + c[colIx] += aval * _values[valOff+j]; + } + } + } + else //general case + { + //iterate over codes, compute all, and add to the result + for( int i=0; i<nrow; i++ ) { + double aval = a[i]; + if( aval != 0 ) { + int valOff = (_data[i]&0xFF) * ncol; + for( int j=0; j<ncol; j++ ) { + int colIx = _colIndexes[j]; + c[colIx] += aval * _values[valOff+j]; + } + } + } + } + } + + @Override + protected void computeSum(MatrixBlock result, KahanFunction kplus) { + final int nrow = getNumRows(); + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + //iterative over codes and count per code (guaranteed <=255) + int[] counts = new int[numVals]; + for( int i=0; i<nrow; i++ ) { + counts[_data[i]&0xFF] ++; + } + + //post-scaling of pre-aggregate with distinct values + KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) { + int cntk = counts[k]; + for( int j=0; j<ncol; j++ ) + kplus.execute3(kbuff, _values[ valOff+j], cntk); + } + + result.quickSetValue(0, 0, kbuff._sum); + result.quickSetValue(0, 1, kbuff._correction); + } + + + @Override + protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); + double[] c = result.getDenseBlock(); + + //pre-aggregate nnz per value tuple + double[] vals = sumAllValues(kplus, kbuff); + + //scan data and add to result (use kahan plus not general KahanFunction + //for correctness in case of sqk+) + for( int i=rl; i<ru; i++ ) { + kbuff.set(c[2*i], c[2*i+1]); + kplus2.execute2(kbuff, vals[_data[i]&0xFF]); + c[2*i] = kbuff._sum; + c[2*i+1] = kbuff._correction; + } + } + + public static void computeRowSums(ColGroupDDC1[] grps, MatrixBlock result, KahanFunction kplus, int rl, int ru) + throws DMLRuntimeException + { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); + double[] c = result.getDenseBlock(); + + //prepare distinct values once + double[][] vals = new double[grps.length][]; + for( int i=0; i<grps.length; i++ ) { + //pre-aggregate all distinct values (guaranteed <=255) + vals[i] = grps[i].sumAllValues(kplus, kbuff); + } + + //cache-conscious row sums operations + //iterative over codes of all groups and add to output + //(use kahan plus not general KahanFunction for correctness in case of sqk+) + int blksz = 1024; //16KB + for( int bi=rl; bi<ru; bi+=blksz ) + for( int j=0; j<grps.length; j++ ) + for( int i=bi; i<Math.min(bi+blksz, ru); i++ ) { + kbuff.set(c[2*i], c[2*i+1]); + kplus2.execute2(kbuff, vals[j][grps[j]._data[i]&0xFF]); + c[2*i] = kbuff._sum; + c[2*i+1] = kbuff._correction; + } + } + + @Override + public ColGroup scalarOperation(ScalarOperator op) throws DMLRuntimeException { + //fast path: sparse-safe and -unsafe operations + //as zero are represented, it is sufficient to simply apply the scalar op + return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op), _data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java new file mode 100644 index 0000000..5f29979 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.utils.ConverterUtils; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.ScalarOperator; + +/** + * Class to encapsulate information about a column group that is encoded with + * dense dictionary encoding (DDC) using 2 byte codes. + */ +public class ColGroupDDC2 extends ColGroupDDC +{ + private static final long serialVersionUID = -3995768285207071013L; + + private static final int MAX_TMP_VALS = 32*1024; + + private char[] _data; + + public ColGroupDDC2() { + super(); + } + + public ColGroupDDC2(int[] colIndices, int numRows, UncompressedBitmap ubm) { + super(colIndices, numRows, ubm); + _data = new char[numRows]; + + int numVals = ubm.getNumValues(); + int numCols = ubm.getNumColumns(); + + //materialize zero values, if necessary + if( ubm.getNumOffsets() < (long)numRows * numCols ) { + int zeroIx = containsAllZeroValue(); + if( zeroIx < 0 ) { + zeroIx = numVals; + _values = Arrays.copyOf(_values, _values.length+numCols); + } + Arrays.fill(_data, (char)zeroIx); + } + + //iterate over values and write dictionary codes + for( int i=0; i<numVals; i++ ) { + int[] tmpList = ubm.getOffsetsList(i).extractValues(); + int tmpListSize = ubm.getNumOffsets(i); + for( int k=0; k<tmpListSize; k++ ) + _data[tmpList[k]] = (char)i; + } + } + + public ColGroupDDC2(int[] colIndices, int numRows, double[] values, char[] data) { + super(colIndices, numRows, values); + _data = data; + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDC2; + } + + @Override + protected double getData(int r, int colIx) { + return _values[_data[r]*getNumCols()+colIx]; + } + + @Override + protected void setData(int r, int code) { + _data[r] = (char)code; + } + + @Override + public void write(DataOutput out) throws IOException { + int numCols = getNumCols(); + int numVals = getNumValues(); + out.writeInt(_numRows); + out.writeInt(numCols); + out.writeInt(numVals); + + //write col indices + for( int i=0; i<_colIndexes.length; i++ ) + out.writeInt( _colIndexes[i] ); + + //write distinct values + for( int i=0; i<_values.length; i++ ) + out.writeDouble(_values[i]); + + //write data + for( int i=0; i<_numRows; i++ ) + out.writeChar(_data[i]); + } + + @Override + public void readFields(DataInput in) throws IOException { + _numRows = in.readInt(); + int numCols = in.readInt(); + int numVals = in.readInt(); + + //read col indices + _colIndexes = new int[ numCols ]; + for( int i=0; i<numCols; i++ ) + _colIndexes[i] = in.readInt(); + + //read distinct values + _values = new double[numVals*numCols]; + for( int i=0; i<numVals*numCols; i++ ) + _values[i] = in.readDouble(); + + //read data + _data = new char[_numRows]; + for( int i=0; i<_numRows; i++ ) + _data[i] = in.readChar(); + } + + @Override + public long getExactSizeOnDisk() { + long ret = 12; //header + //col indices + ret += 4 * _colIndexes.length; + //distinct values (groups of values) + ret += 8 * _values.length; + //data + ret += 2 * _data.length; + + return ret; + } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + + //adding data size + if (_data != null) + size += 2 * _data.length; + + return size; + } + + @Override + public void decompressToBlock(MatrixBlock target, int rl, int ru) { + int ncol = getNumCols(); + for( int i = rl; i < ru; i++ ) + for( int j=0; j<ncol; j++ ) + target.appendValue(i, _colIndexes[j], _values[_data[i]*ncol+j]); + //note: append ok because final sort per row + } + + @Override + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + //pre-aggregate nnz per value tuple + int[] counts = new int[numVals]; + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) + for( int j=0; j<ncol; j++ ) + counts[k] += (_values[valOff+j]!=0) ? 1 : 0; + + //scan data and add counts to output rows + for( int i = rl; i < ru; i++ ) + rnnz[i-rl] += counts[_data[i]]; + } + + @Override + public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) throws DMLRuntimeException { + double[] b = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + //prepare reduced rhs w/ relevant values + double[] sb = new double[numCols]; + for (int j = 0; j < numCols; j++) { + sb[j] = b[_colIndexes[j]]; + } + + //pre-aggregate all distinct values + double[] vals = preaggValues(numVals, sb); + + //iterative over codes and add to output + for( int i=rl; i<ru; i++ ) + c[i] += vals[_data[i]]; + } + + @Override + public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) + throws DMLRuntimeException + { + double[] a = ConverterUtils.getDenseVector(vector); + double[] c = result.getDenseBlock(); + final int nrow = getNumRows(); + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + if( 8*numVals < getNumRows() ) + { + //iterative over codes and pre-aggregate inputs per code + //temporary array also avoids false sharing in multi-threaded environments + double[] vals = new double[numVals]; + for( int i=0; i<nrow; i++ ) { + vals[_data[i]] += a[i]; + } + + //post-scaling of pre-aggregate with distinct values + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) { + double aval = vals[k]; + for( int j=0; j<ncol; j++ ) { + int colIx = _colIndexes[j]; + c[colIx] += aval * _values[valOff+j]; + } + } + } + else //general case + { + + //iterate over codes, compute all, and add to the result + for( int i=0; i<nrow; i++ ) { + double aval = a[i]; + if( aval != 0 ) { + int valOff = _data[i] * ncol; + for( int j=0; j<ncol; j++ ) { + int colIx = _colIndexes[j]; + c[colIx] += aval * _values[valOff+j]; + } + } + } + } + } + + @Override + protected void computeSum(MatrixBlock result, KahanFunction kplus) { + final int nrow = getNumRows(); + final int ncol = getNumCols(); + final int numVals = getNumValues(); + + if( numVals < MAX_TMP_VALS ) + { + //iterative over codes and count per code + int[] counts = new int[numVals]; + for( int i=0; i<nrow; i++ ) { + counts[_data[i]] ++; + } + + //post-scaling of pre-aggregate with distinct values + KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); + for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) { + int cntk = counts[k]; + for( int j=0; j<ncol; j++ ) + kplus.execute3(kbuff, _values[ valOff+j], cntk); + } + + result.quickSetValue(0, 0, kbuff._sum); + result.quickSetValue(0, 1, kbuff._correction); + } + else //general case + { + super.computeSum(result, kplus); + } + } + + + @Override + protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); + double[] c = result.getDenseBlock(); + + //pre-aggregate nnz per value tuple + double[] vals = sumAllValues(kplus, kbuff); + + //scan data and add to result (use kahan plus not general KahanFunction + //for correctness in case of sqk+) + for( int i=rl; i<ru; i++ ) { + kbuff.set(c[2*i], c[2*i+1]); + kplus2.execute2(kbuff, vals[_data[i]]); + c[2*i] = kbuff._sum; + c[2*i+1] = kbuff._correction; + } + } + + @Override + public ColGroup scalarOperation(ScalarOperator op) throws DMLRuntimeException { + //fast path: sparse-safe and -unsafe operations + //as zero are represented, it is sufficient to simply apply the scalar op + return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op), _data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java index 696adf2..f47a432 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java @@ -22,20 +22,16 @@ package org.apache.sysml.runtime.compress; import java.util.Arrays; import java.util.Iterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.utils.ConverterUtils; import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; import org.apache.sysml.runtime.functionobjects.Builtin; -import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysml.runtime.functionobjects.KahanFunction; import org.apache.sysml.runtime.functionobjects.KahanPlus; -import org.apache.sysml.runtime.functionobjects.KahanPlusSq; -import org.apache.sysml.runtime.functionobjects.ReduceAll; -import org.apache.sysml.runtime.functionobjects.ReduceCol; -import org.apache.sysml.runtime.functionobjects.ReduceRow; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; /** @@ -43,12 +39,14 @@ import org.apache.sysml.runtime.matrix.operators.ScalarOperator; * simple lists of offsets for each set of distinct values. * */ -public class ColGroupOLE extends ColGroupBitmap +public class ColGroupOLE extends ColGroupOffset { private static final long serialVersionUID = -9157676271360528008L; + private static final Log LOG = LogFactory.getLog(ColGroupOLE.class.getName()); + public ColGroupOLE() { - super(CompressionType.OLE_BITMAP); + super(); } /** @@ -64,14 +62,15 @@ public class ColGroupOLE extends ColGroupBitmap */ public ColGroupOLE(int[] colIndices, int numRows, UncompressedBitmap ubm) { - super(CompressionType.OLE_BITMAP, colIndices, numRows, ubm); + super(colIndices, numRows, ubm); // compress the bitmaps final int numVals = ubm.getNumValues(); char[][] lbitmaps = new char[numVals][]; int totalLen = 0; for( int i=0; i<numVals; i++ ) { - lbitmaps[i] = BitmapEncoder.genOffsetBitmap(ubm.getOffsetsList(i)); + lbitmaps[i] = BitmapEncoder.genOffsetBitmap( + ubm.getOffsetsList(i).extractValues(), ubm.getNumOffsets(i)); totalLen += lbitmaps[i].length; } @@ -95,13 +94,24 @@ public class ColGroupOLE extends ColGroupBitmap _skiplist[k] = bix; } } + + //debug output + double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, colIndices.length); + if( estimateInMemorySize() > ucSize ) + LOG.warn("OLE group larger than UC dense: "+estimateInMemorySize()+" "+ucSize); } public ColGroupOLE(int[] colIndices, int numRows, boolean zeros, double[] values, char[] bitmaps, int[] bitmapOffs) { - super(CompressionType.OLE_BITMAP, colIndices, numRows, zeros, values); + super(colIndices, numRows, zeros, values); _data = bitmaps; _ptr = bitmapOffs; } + + + @Override + public CompressionType getCompType() { + return CompressionType.OLE_BITMAP; + } @Override public Iterator<Integer> getDecodeIterator(int k) { @@ -251,7 +261,7 @@ public class ColGroupOLE extends ColGroupBitmap } double[] rvalues = applyScalarOp(op, val0, getNumCols()); - char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff); + char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff, loff.length); char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length); System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length); int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1); @@ -284,7 +294,7 @@ public class ColGroupOLE extends ColGroupBitmap //best configuration aligns with L3 cache size (x*vcores*64K*8B < L3) //x=4 leads to a good yet slightly conservative compromise for single-/ //multi-threaded and typical number of cores and L3 cache sizes - final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ; + final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ; //step 1: prepare position and value arrays int[] apos = skipScan(numVals, rl); @@ -365,7 +375,7 @@ public class ColGroupOLE extends ColGroupBitmap if( LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz ) { //cache blocking config (see matrix-vector mult for explanation) - final int blksz2 = ColGroupBitmap.READ_CACHE_BLKSZ; + final int blksz2 = ColGroupOffset.READ_CACHE_BLKSZ; //step 1: prepare position and value arrays @@ -426,46 +436,7 @@ public class ColGroupOLE extends ColGroupBitmap } @Override - public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) - throws DMLRuntimeException - { - unaryAggregateOperations(op, result, 0, getNumRows()); - } - - @Override - public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) - throws DMLRuntimeException - { - //sum and sumsq (reduceall/reducerow over tuples and counts) - if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) - { - KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? - KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); - - if( op.indexFn instanceof ReduceAll ) - computeSum(result, kplus); - else if( op.indexFn instanceof ReduceCol ) - computeRowSums(result, kplus, rl, ru); - else if( op.indexFn instanceof ReduceRow ) - computeColSums(result, kplus); - } - //min and max (reduceall/reducerow over tuples only) - else if(op.aggOp.increOp.fn instanceof Builtin - && (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX - || ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) - { - Builtin builtin = (Builtin) op.aggOp.increOp.fn; - - if( op.indexFn instanceof ReduceAll ) - computeMxx(result, builtin); - else if( op.indexFn instanceof ReduceCol ) - computeRowMxx(result, builtin, rl, ru); - else if( op.indexFn instanceof ReduceRow ) - computeColMxx(result, builtin); - } - } - - private void computeSum(MatrixBlock result, KahanFunction kplus) + protected final void computeSum(MatrixBlock result, KahanFunction kplus) { KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1)); @@ -493,41 +464,88 @@ public class ColGroupOLE extends ColGroupBitmap result.quickSetValue(0, 1, kbuff._correction); } - private void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) + @Override + protected final void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) { KahanObject kbuff = new KahanObject(0, 0); - + KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); + final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; final int numVals = getNumValues(); double[] c = result.getDenseBlock(); - //iterate over all values and their bitmaps - for (int k = 0; k < numVals; k++) + if( ALLOW_CACHE_CONSCIOUS_ROWSUMS && + LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz ) { - //prepare value-to-add for entire value bitmap - int boff = _ptr[k]; - int blen = len(k); - double val = sumValues(k); + final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ/2; - //iterate over bitmap blocks and add values - if (val != 0) { - int slen; - int bix = skipScanVal(k, rl); - for( int off=bix*blksz; bix<blen && off<ru; bix+=slen+1, off+=blksz ) { - slen = _data[boff+bix]; - for (int i = 1; i <= slen; i++) { - int rix = off + _data[boff+bix + i]; - kbuff.set(c[2*rix], c[2*rix+1]); - kplus.execute2(kbuff, val); - c[2*rix] = kbuff._sum; - c[2*rix+1] = kbuff._correction; + //step 1: prepare position and value arrays + int[] apos = skipScan(numVals, rl); + double[] aval = sumAllValues(kplus, kbuff); + + //step 2: cache conscious row sums via horizontal scans + for( int bi=rl; bi<ru; bi+=blksz2 ) + { + int bimax = Math.min(bi+blksz2, ru); + + //horizontal segment scan, incl pos maintenance + for (int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + double val = aval[k]; + int bix = apos[k]; + + for( int ii=bi; ii<bimax && bix<blen; ii+=blksz ) { + //prepare length, start, and end pos + int len = _data[boff+bix]; + int pos = boff+bix+1; + + //compute partial results + for (int i = 0; i < len; i++) { + int rix = ii + _data[pos + i]; + kbuff.set(c[2*rix], c[2*rix+1]); + kplus2.execute2(kbuff, val); + c[2*rix] = kbuff._sum; + c[2*rix+1] = kbuff._correction; + } + bix += len + 1; + } + + apos[k] = bix; + } + } + } + else + { + //iterate over all values and their bitmaps + for (int k = 0; k < numVals; k++) + { + //prepare value-to-add for entire value bitmap + int boff = _ptr[k]; + int blen = len(k); + double val = sumValues(k, kplus, kbuff); + + //iterate over bitmap blocks and add values + if (val != 0) { + int slen; + int bix = skipScanVal(k, rl); + for( int off=((rl+1)/blksz)*blksz; bix<blen && off<ru; bix+=slen+1, off+=blksz ) { + slen = _data[boff+bix]; + for (int i = 1; i <= slen; i++) { + int rix = off + _data[boff+bix + i]; + kbuff.set(c[2*rix], c[2*rix+1]); + kplus2.execute2(kbuff, val); + c[2*rix] = kbuff._sum; + c[2*rix+1] = kbuff._correction; + } } } } } } - private void computeColSums(MatrixBlock result, KahanFunction kplus) + @Override + protected final void computeColSums(MatrixBlock result, KahanFunction kplus) { KahanObject kbuff = new KahanObject(0, 0); @@ -555,7 +573,8 @@ public class ColGroupOLE extends ColGroupBitmap } } - private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) + @Override + protected final void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) { //NOTE: zeros handled once for all column groups outside final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; @@ -624,7 +643,7 @@ public class ColGroupOLE extends ColGroupBitmap protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; - final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ; + final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ; final int numVals = getNumValues(); final int numCols = getNumCols(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java new file mode 100644 index 0000000..e49c1a3 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.compress; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; +import org.apache.sysml.runtime.functionobjects.Builtin; +import org.apache.sysml.runtime.functionobjects.KahanFunction; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.functionobjects.KahanPlusSq; +import org.apache.sysml.runtime.functionobjects.ReduceAll; +import org.apache.sysml.runtime.functionobjects.ReduceCol; +import org.apache.sysml.runtime.functionobjects.ReduceRow; +import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; + + +/** + * Base class for column groups encoded with various types of bitmap encoding. + * + * + * NOTES: + * * OLE: separate storage segment length and bitmaps led to a 30% improvement + * but not applied because more difficult to support both data layouts at the + * same time (distributed/local as well as w/ and w/o low-level opt) + */ +public abstract class ColGroupOffset extends ColGroupValue +{ + private static final long serialVersionUID = -1635828933479403125L; + + protected static final boolean CREATE_SKIPLIST = true; + + protected static final int READ_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ; + public static final int WRITE_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ; + public static boolean ALLOW_CACHE_CONSCIOUS_ROWSUMS = true; + + /** Bitmaps, one per uncompressed value in {@link #_values}. */ + protected int[] _ptr; //bitmap offsets per value + protected char[] _data; //linearized bitmaps (variable length) + protected boolean _zeros; //contains zero values + + protected int[] _skiplist; + + public ColGroupOffset() { + super(); + } + + /** + * Main constructor. Stores the headers for the individual bitmaps. + * + * @param colIndices + * indices (within the block) of the columns included in this + * column + * @param numRows + * total number of rows in the parent block + * @param ubm + * Uncompressed bitmap representation of the block + */ + public ColGroupOffset(int[] colIndices, int numRows, UncompressedBitmap ubm) { + super(colIndices, numRows, ubm); + _zeros = (ubm.getNumOffsets() < numRows); + } + + /** + * Constructor for subclass methods that need to create shallow copies + * + * @param type compression type + * @param colIndices + * raw column index information + * @param numRows + * number of rows in the block + * @param zeros ? + * @param values + * set of distinct values for the block (associated bitmaps are + * kept in the subclass) + */ + protected ColGroupOffset(int[] colIndices, int numRows, boolean zeros, double[] values) { + super(colIndices, numRows, values); + _zeros = zeros; + } + + protected final int len(int k) { + return _ptr[k+1] - _ptr[k]; + } + + protected void createCompressedBitmaps(int numVals, int totalLen, char[][] lbitmaps) { + // compact bitmaps to linearized representation + _ptr = new int[numVals+1]; + _data = new char[totalLen]; + for( int i=0, off=0; i<numVals; i++ ) { + int len = lbitmaps[i].length; + _ptr[i] = off; + System.arraycopy(lbitmaps[i], 0, _data, off, len); + off += len; + } + _ptr[numVals] = totalLen; + } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + + // adding bitmaps size + size += 16; //array references + if (_data != null) { + size += 32 + _ptr.length * 4; // offsets + size += 32 + _data.length * 2; // bitmaps + } + + return size; + } + + //generic decompression for OLE/RLE, to be overwritten for performance + @Override + public void decompressToBlock(MatrixBlock target, int rl, int ru) + { + final int numCols = getNumCols(); + final int numVals = getNumValues(); + int[] colIndices = getColIndices(); + + // Run through the bitmaps for this column group + for (int i = 0; i < numVals; i++) { + Iterator<Integer> decoder = getDecodeIterator(i); + int valOff = i*numCols; + + while (decoder.hasNext()) { + int row = decoder.next(); + if( row<rl ) continue; + if( row>ru ) break; + + for (int colIx = 0; colIx < numCols; colIx++) + target.appendValue(row, colIndices[colIx], _values[valOff+colIx]); + } + } + } + + //generic decompression for OLE/RLE, to be overwritten for performance + @Override + public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) + { + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + // Run through the bitmaps for this column group + for (int i = 0; i < numVals; i++) { + Iterator<Integer> decoder = getDecodeIterator(i); + int valOff = i*numCols; + + while (decoder.hasNext()) { + int row = decoder.next(); + for (int colIx = 0; colIx < numCols; colIx++) { + int origMatrixColIx = getColIndex(colIx); + int targetColIx = colIndexTargets[origMatrixColIx]; + target.quickSetValue(row, targetColIx, _values[valOff+colIx]); + } + } + } + } + + //generic decompression for OLE/RLE, to be overwritten for performance + @Override + public void decompressToBlock(MatrixBlock target, int colpos) + { + final int numCols = getNumCols(); + final int numVals = getNumValues(); + + // Run through the bitmaps for this column group + for (int i = 0; i < numVals; i++) { + Iterator<Integer> decoder = getDecodeIterator(i); + int valOff = i*numCols; + + while (decoder.hasNext()) { + int row = decoder.next(); + target.quickSetValue(row, 0, _values[valOff+colpos]); + } + } + } + + //generic get for OLE/RLE, to be overwritten for performance + //potential: skip scan (segment length agg and run length) instead of decode + @Override + public double get(int r, int c) { + //find local column index + int ix = Arrays.binarySearch(_colIndexes, c); + if( ix < 0 ) + throw new RuntimeException("Column index "+c+" not in bitmap group."); + + //find row index in value offset lists via scan + final int numCols = getNumCols(); + final int numVals = getNumValues(); + for (int i = 0; i < numVals; i++) { + Iterator<Integer> decoder = getDecodeIterator(i); + int valOff = i*numCols; + while (decoder.hasNext()) { + int row = decoder.next(); + if( row == r ) + return _values[valOff+ix]; + else if( row > r ) + break; //current value + } + } + return 0; + } + + protected final void sumAllValues(double[] b, double[] c) + { + final int numVals = getNumValues(); + final int numCols = getNumCols(); + + //vectMultiplyAdd over cols instead of dotProduct over vals because + //usually more values than columns + for( int i=0, off=0; i<numCols; i++, off+=numVals ) + LinearAlgebraUtils.vectMultiplyAdd(b[i], _values, c, off, 0, numVals); + } + + protected final double mxxValues(int bitmapIx, Builtin builtin) + { + final int numCols = getNumCols(); + final int valOff = bitmapIx * numCols; + + double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1); + for( int i = 0; i < numCols; i++ ) + val = builtin.execute2(val, _values[valOff+i]); + + return val; + } + + public char[] getBitmaps() { + return _data; + } + + public int[] getBitmapOffsets() { + return _ptr; + } + + public boolean hasZeros() { + return _zeros; + } + + /** + * @param k + * index of a specific compressed bitmap (stored in subclass, + * index same as {@link #getValues}) + * @return an object for iterating over the row offsets in this bitmap. Only + * valid until the next call to this method. May be reused across + * calls. + */ + public abstract Iterator<Integer> getDecodeIterator(int k); + + //TODO getDecodeIterator(int k, int rl, int ru) + + /** + * Utility function of sparse-unsafe operations. + * + * @param ind row indicator vector of non zeros + * @return offsets + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + protected int[] computeOffsets(boolean[] ind) + throws DMLRuntimeException + { + //determine number of offsets + int numOffsets = 0; + for( int i=0; i<ind.length; i++ ) + numOffsets += ind[i] ? 1 : 0; + + //create offset lists + int[] ret = new int[numOffsets]; + for( int i=0, pos=0; i<ind.length; i++ ) + if( ind[i] ) + ret[pos++] = i; + + return ret; + } + + @Override + public void readFields(DataInput in) + throws IOException + { + _numRows = in.readInt(); + int numCols = in.readInt(); + int numVals = in.readInt(); + _zeros = in.readBoolean(); + + //read col indices + _colIndexes = new int[ numCols ]; + for( int i=0; i<numCols; i++ ) + _colIndexes[i] = in.readInt(); + + //read distinct values + _values = new double[numVals*numCols]; + for( int i=0; i<numVals*numCols; i++ ) + _values[i] = in.readDouble(); + + //read bitmaps + int totalLen = in.readInt(); + _ptr = new int[numVals+1]; + _data = new char[totalLen]; + for( int i=0, off=0; i<numVals; i++ ) { + int len = in.readInt(); + _ptr[i] = off; + for( int j=0; j<len; j++ ) + _data[off+j] = in.readChar(); + off += len; + } + _ptr[numVals] = totalLen; + } + + @Override + public void write(DataOutput out) + throws IOException + { + int numCols = getNumCols(); + int numVals = getNumValues(); + out.writeInt(_numRows); + out.writeInt(numCols); + out.writeInt(numVals); + out.writeBoolean(_zeros); + + //write col indices + for( int i=0; i<_colIndexes.length; i++ ) + out.writeInt( _colIndexes[i] ); + + //write distinct values + for( int i=0; i<_values.length; i++ ) + out.writeDouble(_values[i]); + + //write bitmaps (lens and data, offset later recreated) + int totalLen = 0; + for( int i=0; i<numVals; i++ ) + totalLen += len(i); + out.writeInt(totalLen); + for( int i=0; i<numVals; i++ ) { + int len = len(i); + int off = _ptr[i]; + out.writeInt(len); + for( int j=0; j<len; j++ ) + out.writeChar(_data[off+j]); + } + } + + @Override + public long getExactSizeOnDisk() { + long ret = 13; //header + //col indices + ret += 4 * _colIndexes.length; + //distinct values (groups of values) + ret += 8 * _values.length; + //actual bitmaps + ret += 4; //total length + for( int i=0; i<getNumValues(); i++ ) + ret += 4 + 2 * len(i); + + return ret; + } + + + + @Override + public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) + throws DMLRuntimeException + { + //sum and sumsq (reduceall/reducerow over tuples and counts) + if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) + { + KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? + KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); + + if( op.indexFn instanceof ReduceAll ) + computeSum(result, kplus); + else if( op.indexFn instanceof ReduceCol ) + computeRowSums(result, kplus, rl, ru); + else if( op.indexFn instanceof ReduceRow ) + computeColSums(result, kplus); + } + //min and max (reduceall/reducerow over tuples only) + else if(op.aggOp.increOp.fn instanceof Builtin + && (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX + || ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) + { + Builtin builtin = (Builtin) op.aggOp.increOp.fn; + + if( op.indexFn instanceof ReduceAll ) + computeMxx(result, builtin, _zeros); + else if( op.indexFn instanceof ReduceCol ) + computeRowMxx(result, builtin, rl, ru); + else if( op.indexFn instanceof ReduceRow ) + computeColMxx(result, builtin, _zeros); + } + } + + protected abstract void computeSum(MatrixBlock result, KahanFunction kplus); + + protected abstract void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru); + + protected abstract void computeColSums(MatrixBlock result, KahanFunction kplus); + + protected abstract void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru); + +}
