[SYSTEMML-449] Compressed linear algebra v2

This patch bundles various improvements for the experimental feature
'compressed linear algebra'. In detail, this includes the following
extensions:

* [SYSTEMML-820] New column encoding format DDC (dense dictionary
coding) with DDC1 and DDC2 for 1 and 2 byte codes as well as efficient
operations.

* [SYSTEMML-815] Hardened sample-based estimators (e.g.,
uncompressed size, empty segments, reduced population size, and
stabilization parameter as well as numerically stable implementations),
incl increased sample fraction and removed unnecessary parameters.

* [SYSTEMML-814] Debugging tools for compression plans, compression
tracing, and compression statistics.

* New greedy column grouping algorithm with pruning and memoization.

* New static column partitioning and changed bin packing heuristics.

* Additional operations (e.g., cache-conscious rowSums)

* Various fixes and performance improvements throughout all CLA
components.

* Extended test cases to cover OLE, RLE, DDC, and UC groups as well as
combinations thereof.

* Various internal refactorings to simplify the extension and
maintenance of CLA. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/37a215bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/37a215bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/37a215bc

Branch: refs/heads/master
Commit: 37a215bc3be26495c351eae6be4b85eaf22daedc
Parents: 390b81c
Author: Matthias Boehm <[email protected]>
Authored: Sun Feb 5 16:22:01 2017 +0100
Committer: Matthias Boehm <[email protected]>
Committed: Wed Feb 8 03:12:18 2017 +0100

----------------------------------------------------------------------
 .../sysml/runtime/compress/BitmapEncoder.java   |  22 +-
 .../apache/sysml/runtime/compress/ColGroup.java |  39 +-
 .../sysml/runtime/compress/ColGroupBitmap.java  | 580 ------------------
 .../sysml/runtime/compress/ColGroupDDC.java     | 227 +++++++
 .../sysml/runtime/compress/ColGroupDDC1.java    | 358 +++++++++++
 .../sysml/runtime/compress/ColGroupDDC2.java    | 312 ++++++++++
 .../sysml/runtime/compress/ColGroupOLE.java     | 173 +++---
 .../sysml/runtime/compress/ColGroupOffset.java  | 424 +++++++++++++
 .../sysml/runtime/compress/ColGroupRLE.java     | 178 +++---
 .../runtime/compress/ColGroupUncompressed.java  |  41 +-
 .../sysml/runtime/compress/ColGroupValue.java   | 303 ++++++++++
 .../runtime/compress/CompressedMatrixBlock.java | 419 ++++++++-----
 .../runtime/compress/PlanningBinPacker.java     | 112 ----
 .../sysml/runtime/compress/PlanningCoCoder.java | 257 --------
 .../runtime/compress/PlanningCoCodingGroup.java | 110 ----
 .../compress/PlanningGroupMergeAction.java      |  73 ---
 .../compress/ReaderColumnSelectionSparse.java   |   1 -
 .../runtime/compress/UncompressedBitmap.java    |  73 ++-
 .../compress/cocode/ColumnGroupPartitioner.java |  19 +
 .../ColumnGroupPartitionerBinPacking.java       | 100 +++
 .../cocode/ColumnGroupPartitionerStatic.java    |  52 ++
 .../compress/cocode/PlanningCoCoder.java        | 236 ++++++++
 .../compress/cocode/PlanningCoCodingGroup.java  | 175 ++++++
 .../compress/cocode/PlanningMemoTable.java      |  75 +++
 .../compress/estim/CompressedSizeEstimator.java |  47 +-
 .../estim/CompressedSizeEstimatorExact.java     |   5 +-
 .../estim/CompressedSizeEstimatorSample.java    | 605 ++++++++++---------
 .../compress/estim/CompressedSizeInfo.java      |  46 +-
 .../compress/estim/SizeEstimatorFactory.java    |   6 +-
 .../runtime/compress/utils/ConverterUtils.java  |  16 +
 .../runtime/compress/utils/IntArrayList.java    |  13 +-
 .../compress/utils/LinearAlgebraUtils.java      | 164 +++++
 .../compress/BasicCompressionTest.java          |  40 +-
 .../functions/compress/BasicGetValueTest.java   |  40 +-
 .../compress/BasicMatrixAppendTest.java         |  40 +-
 .../compress/BasicMatrixMultChainTest.java      |  76 ++-
 .../BasicMatrixTransposeSelfMultTest.java       |  40 +-
 .../compress/BasicMatrixVectorMultTest.java     |  40 +-
 .../BasicScalarOperationsSparseUnsafeTest.java  |  40 +-
 .../compress/BasicScalarOperationsTest.java     |  40 +-
 .../BasicTransposeSelfLeftMatrixMultTest.java   |  40 +-
 .../compress/BasicUnaryAggregateTest.java       | 326 +++++++---
 .../compress/BasicVectorMatrixMultTest.java     |  40 +-
 .../functions/compress/CompressedLinregCG.java  |   5 +-
 .../compress/CompressedSerializationTest.java   |  40 +-
 .../compress/LargeCompressionTest.java          |  40 +-
 .../compress/LargeMatrixVectorMultTest.java     |  40 +-
 .../compress/LargeParMatrixVectorMultTest.java  |  40 +-
 .../compress/LargeParUnaryAggregateTest.java    | 337 +++++++----
 .../compress/LargeVectorMatrixMultTest.java     |  40 +-
 .../functions/compress/ParCompressionTest.java  |  40 +-
 .../compress/ParMatrixMultChainTest.java        |  66 +-
 .../compress/ParMatrixVectorMultTest.java       |  40 +-
 .../ParTransposeSelfLeftMatrixMultTest.java     |  40 +-
 .../compress/ParUnaryAggregateTest.java         | 327 ++++++----
 .../compress/ParVectorMatrixMultTest.java       |  40 +-
 56 files changed, 4733 insertions(+), 2385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
index 7fd2c69..b27112f 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.compress;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 
 import org.apache.sysml.runtime.compress.utils.DblArray;
 import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
@@ -100,8 +99,8 @@ public class BitmapEncoder
         *            the offsets of different bits
         * @return compressed version of said bitmap
         */
-       public static char[] genRLEBitmap(int[] offsets) {
-               if( offsets.length == 0 )
+       public static char[] genRLEBitmap(int[] offsets, int len) {
+               if( len == 0 )
                        return new char[0]; //empty list
 
                // Use an ArrayList for correctness at the expense of temp space
@@ -139,7 +138,7 @@ public class BitmapEncoder
                curRunLen = 1;
 
                // Process the remaining offsets
-               for (int i = 1; i < offsets.length; i++) {
+               for (int i = 1; i < len; i++) {
 
                        int absOffset = offsets[i];
 
@@ -179,9 +178,8 @@ public class BitmapEncoder
 
                // Convert wasteful ArrayList to packed array.
                char[] ret = new char[buf.size()];
-               for (int i = 0; i < buf.size(); i++) {
+               for(int i = 0; i < buf.size(); i++ )
                        ret[i] = buf.get(i);
-               }
                return ret;
        }
 
@@ -194,21 +192,19 @@ public class BitmapEncoder
         *            the offsets of different bits
         * @return compressed version of said bitmap
         */
-       public static char[] genOffsetBitmap(int[] offsets) 
-       {
-               int lastOffset = offsets[offsets.length - 1];
+       public static char[] genOffsetBitmap(int[] offsets, int len) 
+       { 
+               int lastOffset = offsets[len - 1];
 
                // Build up the blocks
                int numBlocks = (lastOffset / BITMAP_BLOCK_SZ) + 1;
                // To simplify the logic, we make two passes.
                // The first pass divides the offsets by block.
                int[] blockLengths = new int[numBlocks];
-               Arrays.fill(blockLengths, 0);
 
-               for (int ix = 0; ix < offsets.length; ix++) {
+               for (int ix = 0; ix < len; ix++) {
                        int val = offsets[ix];
                        int blockForVal = val / BITMAP_BLOCK_SZ;
-
                        blockLengths[blockForVal]++;
                }
 
@@ -238,7 +234,7 @@ public class BitmapEncoder
 
                return encodedBlocks;
        }
-
+       
        private static UncompressedBitmap extractBitmap(int colIndex, 
MatrixBlock rawblock, boolean skipZeros) 
        {
                //probe map for distinct items (for value or value groups)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index 586690c..bf1b822 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -40,9 +40,11 @@ public abstract class ColGroup implements Serializable
        private static final long serialVersionUID = 2439785418908671481L;
 
        public enum CompressionType  {
-               UNCOMPRESSED,   //uncompressed sparse/dense 
-               RLE_BITMAP,     //RLE bitmap
-               OLE_BITMAP;  //OLE bitmap
+               UNCOMPRESSED, //uncompressed sparse/dense 
+               RLE_BITMAP,  //RLE bitmap
+               OLE_BITMAP,  //OLE bitmap
+               DDC1, //DDC 1 byte
+               DDC2; //DDC 2 byte
        }
        
        /**
@@ -53,23 +55,17 @@ public abstract class ColGroup implements Serializable
 
        /** Number of rows in the matrix, for use by child classes. */
        protected int _numRows;
-
-       /** How the elements of the column group are compressed. */
-       private CompressionType _compType;
-
        
        /**
         * Main constructor.
         * 
-        * @param type compression type
         * @param colIndices
         *            offsets of the columns in the matrix block that make up 
the
         *            group
         * @param numRows
         *            total number of rows in the parent block
         */
-       protected ColGroup(CompressionType type, int[] colIndices, int numRows) 
{
-               _compType = type;
+       protected ColGroup(int[] colIndices, int numRows) {
                _colIndexes = colIndices;
                _numRows = numRows;
        }
@@ -77,16 +73,15 @@ public abstract class ColGroup implements Serializable
        /**
         * Convenience constructor for converting indices to a more compact 
format.
         * 
-        * @param type compression type
         * @param colIndicesList list of column indices
         * @param numRows total number of rows in the parent block
         */
-       protected ColGroup(CompressionType type, List<Integer> colIndicesList, 
int numRows) {
-               _compType = type;
+       protected ColGroup(List<Integer> colIndicesList, int numRows) {
                _colIndexes = new int[colIndicesList.size()];
                int i = 0;
                for (Integer index : colIndicesList)
                        _colIndexes[i++] = index;
+               _numRows = numRows;
        }
 
        /**
@@ -126,9 +121,7 @@ public abstract class ColGroup implements Serializable
         * 
         * @return How the elements of the column group are compressed.
         */
-       public CompressionType getCompType() {
-               return _compType;
-       }
+       public abstract CompressionType getCompType();
 
        public void shiftColIndices(int offset)  {
                for( int i=0; i<_colIndexes.length; i++ )
@@ -143,14 +136,12 @@ public abstract class ColGroup implements Serializable
         *         in memory.
         */
        public long estimateInMemorySize() {
-               // int numRows (4B) , array reference colIndices (8B) + array 
object
-               // overhead if exists (32B) + 4B per element, CompressionType 
compType
-               // (2 booleans 2B + enum overhead 32B + reference to enum 8B)
-               long size = 54;
-               if (_colIndexes == null)
-                       return size;
-               else
-                       return size + 32 + 4 * _colIndexes.length;
+               // object (12B padded to factors of 8), int numRows (4B), 
+               // array reference colIndices (8B) 
+               //+ array object overhead if exists (32B) + 4B per element
+               long size = 24;
+               return (_colIndexes == null) ? size : 
+                       size + 32 + 4 * _colIndexes.length;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
deleted file mode 100644
index dac18ef..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
-import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
-
-
-/**
- * Base class for column groups encoded with various types of bitmap encoding.
- * 
- * 
- * NOTES:
- *  * OLE: separate storage segment length and bitmaps led to a 30% improvement
- *    but not applied because more difficult to support both data layouts at 
the
- *    same time (distributed/local as well as w/ and w/o low-level opt)
- */
-public abstract class ColGroupBitmap extends ColGroup 
-{
-       private static final long serialVersionUID = -1635828933479403125L;
-       
-       public static final boolean LOW_LEVEL_OPT = true;       
-       //sorting of values by physical length helps by 10-20%, especially for 
serial, while
-       //slight performance decrease for parallel incl multi-threaded, hence 
not applied for
-       //distributed operations (also because compression time + garbage 
collection increases)
-       private static final boolean SORT_VALUES_BY_LENGTH = true; 
-       protected static final boolean CREATE_SKIPLIST = true;
-       
-       protected static final int READ_CACHE_BLKSZ = 2 * 
BitmapEncoder.BITMAP_BLOCK_SZ;
-       protected static final int WRITE_CACHE_BLKSZ = 2 * 
BitmapEncoder.BITMAP_BLOCK_SZ;
-       
-       /** Distinct values associated with individual bitmaps. */
-       protected double[] _values; //linearized <numcol vals> <numcol vals>
-
-       /** Bitmaps, one per uncompressed value in {@link #_values}. */
-       protected int[] _ptr; //bitmap offsets per value
-       protected char[] _data; //linearized bitmaps (variable length)
-       protected boolean _zeros; //contains zero values
-       
-       protected int[] _skiplist;
-       
-       public ColGroupBitmap(CompressionType type) {
-               super(type, (int[]) null, -1);
-       }
-       
-       /**
-        * Main constructor. Stores the headers for the individual bitmaps.
-        * 
-        * @param type column type
-        * @param colIndices
-        *            indices (within the block) of the columns included in this
-        *            column
-        * @param numRows
-        *            total number of rows in the parent block
-        * @param ubm
-        *            Uncompressed bitmap representation of the block
-        */
-       public ColGroupBitmap(CompressionType type, int[] colIndices, int 
numRows, UncompressedBitmap ubm) 
-       {
-               super(type, colIndices, numRows);
-
-               // Extract and store just the distinct values. The bitmaps 
themselves go
-               // into the subclasses.
-               final int numCols = ubm.getNumColumns();
-               final int numVals = ubm.getNumValues();
-               
-               _values = new double[numVals*numCols];
-               _zeros = (ubm.getNumOffsets() < numRows);
-               
-               for (int i=0; i<numVals; i++) {
-                       //note: deep copied internally on getValues
-                       double[] tmp = ubm.getValues(i);
-                       System.arraycopy(tmp, 0, _values, i*numCols, numCols);
-               }
-       }
-
-       /**
-        * Constructor for subclass methods that need to create shallow copies
-        * 
-        * @param type compression type
-        * @param colIndices
-        *            raw column index information
-        * @param numRows
-        *            number of rows in the block
-        * @param zeros ?
-        * @param values
-        *            set of distinct values for the block (associated bitmaps 
are
-        *            kept in the subclass)
-        */
-       protected ColGroupBitmap(CompressionType type, int[] colIndices, int 
numRows, boolean zeros, double[] values) {
-               super(type, colIndices, numRows);
-               _zeros = zeros;
-               _values = values;
-       }
-       
-       protected final int len(int k) {
-               return _ptr[k+1] - _ptr[k];
-       }
-
-       protected void createCompressedBitmaps(int numVals, int totalLen, 
char[][] lbitmaps)
-       {
-               // compact bitmaps to linearized representation
-               if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH
-                       && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
-               {
-                       // sort value by num segments in descending order
-                       TreeMap<Integer,ArrayList<Integer>> tree = new 
TreeMap<Integer, ArrayList<Integer>>();
-                       for( int i=0; i<numVals; i++ ) {
-                               int revlen = totalLen-lbitmaps[i].length;
-                               if( !tree.containsKey(revlen) )
-                                       tree.put(revlen, new 
ArrayList<Integer>());
-                               tree.get(revlen).add(i);
-                       }
-                       
-                       // compact bitmaps to linearized representation
-                       _ptr = new int[numVals+1];
-                       _data = new char[totalLen];
-                       int pos = 0, off = 0;
-                       for( Entry<Integer,ArrayList<Integer>> e : 
tree.entrySet() ) {
-                               for( Integer tmpix : e.getValue() ) {
-                                       int len = lbitmaps[tmpix].length;
-                                       _ptr[pos] = off;
-                                       System.arraycopy(lbitmaps[tmpix], 0, 
_data, off, len);
-                                       off += len;
-                                       pos++;
-                               }
-                       }
-                       _ptr[numVals] = totalLen;
-                       
-                       // reorder values
-                       double[] lvalues = new double[_values.length];
-                       int off2 = 0; int numCols = _colIndexes.length;
-                       for( Entry<Integer,ArrayList<Integer>> e : 
tree.entrySet() ) {
-                               for( Integer tmpix : e.getValue() ) {
-                                       System.arraycopy(_values, 
tmpix*numCols, lvalues, off2, numCols);                               
-                                       off2 += numCols;
-                               }
-                       }                       
-                       _values = lvalues;
-               }
-               else
-               {
-                       // compact bitmaps to linearized representation
-                       _ptr = new int[numVals+1];
-                       _data = new char[totalLen];
-                       for( int i=0, off=0; i<numVals; i++ ) {
-                               int len = lbitmaps[i].length;
-                               _ptr[i] = off;
-                               System.arraycopy(lbitmaps[i], 0, _data, off, 
len);
-                               off += len;
-                       }
-                       _ptr[numVals] = totalLen;
-               }
-       }
-       
-       @Override
-       public long estimateInMemorySize() {
-               long size = super.estimateInMemorySize();
-               
-               // adding the size of values
-               size += 8; //array reference
-               if (_values != null) {
-                       size += 32 + _values.length * 8; //values
-               }
-               
-               // adding bitmaps size
-               size += 16; //array references
-               if (_data != null) {
-                       size += 32 + _ptr.length * 4; // offsets
-                       size += 32 + _data.length * 2;    // bitmaps
-               }
-       
-               return size;
-       }
-
-       //generic decompression for OLE/RLE, to be overwritten for performance
-       @Override
-       public void decompressToBlock(MatrixBlock target, int rl, int ru) 
-       {
-               final int numCols = getNumCols();
-               final int numVals = getNumValues();
-               int[] colIndices = getColIndices();
-               
-               // Run through the bitmaps for this column group
-               for (int i = 0; i < numVals; i++) {
-                       Iterator<Integer> decoder = getDecodeIterator(i);
-                       int valOff = i*numCols;
-
-                       while (decoder.hasNext()) {
-                               int row = decoder.next();
-                               if( row<rl ) continue;
-                               if( row>ru ) break;
-                               
-                               for (int colIx = 0; colIx < numCols; colIx++)
-                                       target.appendValue(row, 
colIndices[colIx], _values[valOff+colIx]);
-                       }
-               }
-       }
-
-       //generic decompression for OLE/RLE, to be overwritten for performance
-       @Override
-       public void decompressToBlock(MatrixBlock target, int[] 
colIndexTargets) 
-       {
-               final int numCols = getNumCols();
-               final int numVals = getNumValues();
-               
-               // Run through the bitmaps for this column group
-               for (int i = 0; i < numVals; i++) {
-                       Iterator<Integer> decoder = getDecodeIterator(i);
-                       int valOff = i*numCols;
-
-                       while (decoder.hasNext()) {
-                               int row = decoder.next();
-                               for (int colIx = 0; colIx < numCols; colIx++) {
-                                       int origMatrixColIx = 
getColIndex(colIx);
-                                       int targetColIx = 
colIndexTargets[origMatrixColIx];
-                                       target.quickSetValue(row, targetColIx, 
_values[valOff+colIx]);
-                               }
-                       }
-               }
-       }
-       
-       //generic decompression for OLE/RLE, to be overwritten for performance
-       @Override
-       public void decompressToBlock(MatrixBlock target, int colpos) 
-       {
-               final int numCols = getNumCols();
-               final int numVals = getNumValues();
-               
-               // Run through the bitmaps for this column group
-               for (int i = 0; i < numVals; i++) {
-                       Iterator<Integer> decoder = getDecodeIterator(i);
-                       int valOff = i*numCols;
-
-                       while (decoder.hasNext()) {
-                               int row = decoder.next();
-                               target.quickSetValue(row, 0, 
_values[valOff+colpos]);
-                       }
-               }
-       }
-
-       //generic get for OLE/RLE, to be overwritten for performance
-       //potential: skip scan (segment length agg and run length) instead of 
decode
-       @Override
-       public double get(int r, int c) {
-               //find local column index
-               int ix = Arrays.binarySearch(_colIndexes, c);
-               if( ix < 0 )
-                       throw new RuntimeException("Column index "+c+" not in 
bitmap group.");
-               
-               //find row index in value offset lists via scan
-               final int numCols = getNumCols();
-               final int numVals = getNumValues();
-               for (int i = 0; i < numVals; i++) {
-                       Iterator<Integer> decoder = getDecodeIterator(i);
-                       int valOff = i*numCols;
-                       while (decoder.hasNext()) {
-                               int row = decoder.next();
-                               if( row == r )
-                                       return _values[valOff+ix];
-                               else if( row > r )
-                                       break; //current value
-                       }
-               }               
-               return 0;
-       }
-
-       public abstract void unaryAggregateOperations(AggregateUnaryOperator 
op, MatrixBlock result, int rl, int ru)
-               throws DMLRuntimeException;
-
-       protected final double sumValues(int bitmapIx)
-       {
-               final int numCols = getNumCols();
-               final int valOff = bitmapIx * numCols;
-               
-               double val = 0.0;
-               for( int i = 0; i < numCols; i++ ) {
-                       val += _values[valOff+i];
-               }
-               
-               return val;
-       }
-       
-       protected final double sumValues(int bitmapIx, double[] b)
-       {
-               final int numCols = getNumCols();
-               final int valOff = bitmapIx * numCols;
-               
-               double val = 0;
-               for( int i = 0; i < numCols; i++ ) {
-                       val += _values[valOff+i] * b[i];
-               }
-               
-               return val;
-       }
-
-       protected final double mxxValues(int bitmapIx, Builtin builtin)
-       {
-               final int numCols = getNumCols();
-               final int valOff = bitmapIx * numCols;
-               
-               double val = Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
-               for( int i = 0; i < numCols; i++ )
-                       val = builtin.execute2(val, _values[valOff+i]);
-               
-               return val;
-       }
-
-       protected final double[] preaggValues(int numVals, double[] b) {
-               double[] ret = new double[numVals];
-               for( int k = 0; k < numVals; k++ )
-                       ret[k] = sumValues(k, b);
-               
-               return ret;
-       }
-       
-       /**
-        * Method for use by subclasses. Applies a scalar operation to the value
-        * metadata stored in the superclass.
-        * 
-        * @param op
-        *            scalar operation to perform
-        * @return transformed copy of value metadata for this column group
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       protected double[] applyScalarOp(ScalarOperator op)
-                       throws DMLRuntimeException 
-       {
-               //scan over linearized values
-               double[] ret = new double[_values.length];
-               for (int i = 0; i < _values.length; i++) {
-                       ret[i] = op.executeScalar(_values[i]);
-               }
-
-               return ret;
-       }
-
-       protected double[] applyScalarOp(ScalarOperator op, double newVal, int 
numCols)
-                       throws DMLRuntimeException 
-       {
-               //scan over linearized values
-               double[] ret = new double[_values.length + numCols];
-               for( int i = 0; i < _values.length; i++ ) {
-                       ret[i] = op.executeScalar(_values[i]);
-               }
-               
-               //add new value to the end
-               Arrays.fill(ret, _values.length, _values.length+numCols, 
newVal);
-               
-               return ret;
-       }
-       
-       /**
-        * NOTE: Shared across OLE/RLE because value-only computation. 
-        * 
-        * @param result matrix block
-        * @param builtin ?
-        */
-       protected void computeMxx(MatrixBlock result, Builtin builtin) 
-       {
-               //init and 0-value handling
-               double val = Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
-               if( _zeros )
-                       val = builtin.execute2(val, 0);
-               
-               //iterate over all values only
-               final int numVals = getNumValues();
-               final int numCols = getNumCols();               
-               for (int k = 0; k < numVals; k++)
-                       for( int j=0, valOff = k*numCols; j<numCols; j++ )
-                               val = builtin.execute2(val, _values[ valOff+j 
]);
-               
-               //compute new partial aggregate
-               val = builtin.execute2(val, result.quickGetValue(0, 0));
-               result.quickSetValue(0, 0, val);
-       }
-       
-       /**
-        * NOTE: Shared across OLE/RLE because value-only computation. 
-        * 
-        * @param result matrix block
-        * @param builtin ?
-        */
-       protected void computeColMxx(MatrixBlock result, Builtin builtin)
-       {
-               final int numVals = getNumValues();
-               final int numCols = getNumCols();
-               
-               //init and 0-value handling
-               double[] vals = new double[numCols];
-               Arrays.fill(vals, Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1));
-               if( _zeros ) {
-                       for( int j = 0; j < numCols; j++ )
-                               vals[j] = builtin.execute2(vals[j], 0);         
-               }
-               
-               //iterate over all values only
-               for (int k = 0; k < numVals; k++) 
-                       for( int j=0, valOff=k*numCols; j<numCols; j++ )
-                               vals[j] = builtin.execute2(vals[j], _values[ 
valOff+j ]);
-               
-               //copy results to output
-               for( int j=0; j<numCols; j++ )
-                       result.quickSetValue(0, _colIndexes[j], vals[j]);
-       }
-       
-
-       /**
-        * Obtain number of distrinct sets of values associated with the 
bitmaps in this column group.
-        * 
-        * @return the number of distinct sets of values associated with the 
bitmaps
-        *         in this column group
-        */
-       public int getNumValues() {
-               return _values.length / _colIndexes.length;
-       }
-
-       public double[] getValues() {
-               return _values;
-       }
-
-       public char[] getBitmaps() {
-               return _data;
-       }
-       
-       public int[] getBitmapOffsets() {
-               return _ptr;
-       }
-
-       public boolean hasZeros() {
-               return _zeros;
-       }
-       
-       /**
-        * @param k
-        *            index of a specific compressed bitmap (stored in subclass,
-        *            index same as {@link #getValues})
-        * @return an object for iterating over the row offsets in this bitmap. 
Only
-        *         valid until the next call to this method. May be reused 
across
-        *         calls.
-        */
-       public abstract Iterator<Integer> getDecodeIterator(int k);
-
-       //TODO getDecodeIterator(int k, int rl, int ru)
-
-       /**
-        * Utility function of sparse-unsafe operations.
-        * 
-        * @param ind ?
-        * @return offsets
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       protected int[] computeOffsets(boolean[] ind)
-               throws DMLRuntimeException 
-       {
-               //determine number of offsets
-               int numOffsets = 0;
-               for( int i=0; i<ind.length; i++ )
-                       numOffsets += ind[i] ? 1 : 0;
-               
-               //create offset lists
-               int[] ret = new int[numOffsets];
-               for( int i=0, pos=0; i<ind.length; i++ )
-                       if( ind[i] )
-                               ret[pos++] = i;
-               
-               return ret;
-       }
-
-       @Override
-       public void readFields(DataInput in) 
-               throws IOException 
-       {
-               _numRows = in.readInt();
-               int numCols = in.readInt();
-               int numVals = in.readInt();
-               _zeros = in.readBoolean();
-               
-               //read col indices
-               _colIndexes = new int[ numCols ];
-               for( int i=0; i<numCols; i++ )
-                       _colIndexes[i] = in.readInt();
-               
-               //read distinct values
-               _values = new double[numVals*numCols];
-               for( int i=0; i<numVals*numCols; i++ )
-                       _values[i] = in.readDouble();
-               
-               //read bitmaps
-               int totalLen = in.readInt();
-               _ptr = new int[numVals+1];
-               _data = new char[totalLen];             
-               for( int i=0, off=0; i<numVals; i++ ) {
-                       int len = in.readInt();
-                       _ptr[i] = off;
-                       for( int j=0; j<len; j++ )
-                               _data[off+j] = in.readChar();
-                       off += len;
-               }
-               _ptr[numVals] = totalLen;
-       }
-       
-       @Override
-       public void write(DataOutput out) 
-               throws IOException 
-       {
-               int numCols = getNumCols();
-               int numVals = getNumValues();
-               out.writeInt(_numRows);
-               out.writeInt(numCols);
-               out.writeInt(numVals);
-               out.writeBoolean(_zeros);
-               
-               //write col indices
-               for( int i=0; i<_colIndexes.length; i++ )
-                       out.writeInt( _colIndexes[i] );
-               
-               //write distinct values
-               for( int i=0; i<_values.length; i++ )
-                       out.writeDouble(_values[i]);
-
-               //write bitmaps (lens and data, offset later recreated)
-               int totalLen = 0;
-               for( int i=0; i<numVals; i++ )
-                       totalLen += len(i);
-               out.writeInt(totalLen); 
-               for( int i=0; i<numVals; i++ ) {
-                       int len = len(i);
-                       int off = _ptr[i];
-                       out.writeInt(len);
-                       for( int j=0; j<len; j++ )
-                               out.writeChar(_data[off+j]);
-               }
-       }
-
-       @Override
-       public long getExactSizeOnDisk() {
-               long ret = 13; //header
-               //col indices
-               ret += 4 * _colIndexes.length; 
-               //distinct values (groups of values)
-               ret += 8 * _values.length;
-               //actual bitmaps
-               ret += 4; //total length
-               for( int i=0; i<getNumValues(); i++ )
-                       ret += 4 + 2 * len(i);
-               
-               return ret;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
new file mode 100644
index 0000000..1782e2e
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC).
+ * 
+ * NOTE: zero values are included at position 0 in the value dictionary, which
+ * simplifies various operations such as counting the number of non-zeros.
+ */
+public abstract class ColGroupDDC extends ColGroupValue 
+{
+       private static final long serialVersionUID = -3204391646123465004L;
+
+       public ColGroupDDC() {
+               super();
+       }
+       
+       public ColGroupDDC(int[] colIndices, int numRows, UncompressedBitmap 
ubm) {
+               super(colIndices, numRows, ubm);
+       }
+       
+       protected ColGroupDDC(int[] colIndices, int numRows, double[] values) {
+               super(colIndices, numRows, values);
+       }
+       
+       @Override
+       public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+               for( int i = rl; i < ru; i++ ) {
+                       for( int colIx = 0; colIx < _colIndexes.length; colIx++ 
) {
+                               int col = _colIndexes[colIx];
+                               double cellVal = getData(i, colIx);
+                               target.quickSetValue(i, col, cellVal);
+                       }
+               }
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target, int[] 
colIndexTargets) {
+               int nrow = getNumRows();
+               int ncol = getNumCols();
+               for( int i = 0; i < nrow; i++ ) {
+                       for( int colIx = 0; colIx < ncol; colIx++ ) {
+                               int origMatrixColIx = getColIndex(colIx);
+                               int col = colIndexTargets[origMatrixColIx];
+                               double cellVal = getData(i, colIx);
+                               target.quickSetValue(i, col, cellVal);
+                       }
+               }
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target, int colpos) {
+               int nrow = getNumRows();
+               for( int i = 0; i < nrow; i++ ) {
+                       double cellVal = getData(i, colpos);
+                       target.quickSetValue(i, 0, cellVal);
+               }
+       }
+       
+       @Override
+       public double get(int r, int c) {
+               //find local column index
+               int ix = Arrays.binarySearch(_colIndexes, c);
+               if( ix < 0 )
+                       throw new RuntimeException("Column index "+c+" not in 
DDC group.");
+               
+               //get value
+               return getData(r, ix);
+       }
+       
+
+       @Override
+       protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+               int ncol = getNumCols();
+               for( int i = rl; i < ru; i++ ) {
+                       int lnnz = 0;
+                       for( int colIx=0; colIx < ncol; colIx++ )
+                               lnnz += (getData(i, colIx) != 0) ? 1 : 0;
+                       rnnz[i-rl] += lnnz;
+               }
+       }
+       
+       @Override
+       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result, int rl, int ru)
+               throws DMLRuntimeException 
+       {
+               //sum and sumsq (reduceall/reducerow over tuples and counts)
+               if( op.aggOp.increOp.fn instanceof KahanPlus || 
op.aggOp.increOp.fn instanceof KahanPlusSq ) 
+               {
+                       KahanFunction kplus = (op.aggOp.increOp.fn instanceof 
KahanPlus) ?
+                                       KahanPlus.getKahanPlusFnObject() : 
KahanPlusSq.getKahanPlusSqFnObject();
+                       
+                       if( op.indexFn instanceof ReduceAll )
+                               computeSum(result, kplus);
+                       else if( op.indexFn instanceof ReduceCol )
+                               computeRowSums(result, kplus, rl, ru);
+                       else if( op.indexFn instanceof ReduceRow )
+                               computeColSums(result, kplus);
+               }
+               //min and max (reduceall/reducerow over tuples only)
+               else if(op.aggOp.increOp.fn instanceof Builtin 
+                               && 
(((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
+                               || 
((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
+               {               
+                       Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+
+                       if( op.indexFn instanceof ReduceAll )
+                               computeMxx(result, builtin, false);
+                       else if( op.indexFn instanceof ReduceCol )
+                               computeRowMxx(result, builtin, rl, ru);
+                       else if( op.indexFn instanceof ReduceRow )
+                               computeColMxx(result, builtin, false);
+               }
+       }
+       
+       protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+               int nrow = getNumRows();
+               int ncol = getNumCols();
+               KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), 
result.quickGetValue(0, 1));
+               
+               for( int i=0; i<nrow; i++ )
+                       for( int j=0; j<ncol; j++ )
+                               kplus.execute2(kbuff, getData(i, j));
+               
+               result.quickSetValue(0, 0, kbuff._sum);
+               result.quickSetValue(0, 1, kbuff._correction);
+       }
+       
+       protected void computeColSums(MatrixBlock result, KahanFunction kplus) {
+               int nrow = getNumRows();
+               int ncol = getNumCols();
+               KahanObject[] kbuff = new KahanObject[getNumCols()];
+               for( int j=0; j<ncol; j++ )
+                       kbuff[j] = new KahanObject(result.quickGetValue(0, 
_colIndexes[j]), 
+                                       result.quickGetValue(1, 
_colIndexes[j]));
+               
+               for( int i=0; i<nrow; i++ )
+                       for( int j=0; j<ncol; j++ )
+                               kplus.execute2(kbuff[j], getData(i, j));
+               
+               for( int j=0; j<ncol; j++ ) {
+                       result.quickSetValue(0, _colIndexes[j], kbuff[j]._sum);
+                       result.quickSetValue(1, _colIndexes[j], 
kbuff[j]._correction);
+               }
+       }
+
+       protected void computeRowSums(MatrixBlock result, KahanFunction kplus, 
int rl, int ru) {
+               int ncol = getNumCols();
+               KahanObject kbuff = new KahanObject(0, 0);
+               
+               for( int i=rl; i<ru; i++ ) {
+                       kbuff.set(result.quickGetValue(i, 0), 
result.quickGetValue(i, 1));
+                       for( int j=0; j<ncol; j++ )
+                               kplus.execute2(kbuff, getData(i, j));
+                       result.quickSetValue(i, 0, kbuff._sum);
+                       result.quickSetValue(i, 1, kbuff._correction);
+               }
+       }
+       
+       protected void computeRowMxx(MatrixBlock result, Builtin builtin, int 
rl, int ru) {
+               double[] c = result.getDenseBlock();
+               int ncol = getNumCols();
+               
+               for( int i=rl; i<ru; i++ )
+                       for( int j=0; j<ncol; j++ )
+                               c[i] = builtin.execute2(c[i], getData(i, j));
+       }
+       
+       
+
+       /**
+        * Generic get value for byte-length-agnostic access.
+        * 
+        * @param r global row index
+        * @param colIx local column index 
+        * @return value
+        */
+       protected abstract double getData(int r, int colIx);
+       
+       /**
+        * Generic set value for byte-length-agnostic write 
+        * of encoded value.
+        * 
+        * @param r global row index
+        * @param code encoded value 
+        */
+       protected abstract void setData(int r, int code);
+       
+       @Override
+       public long estimateInMemorySize() {
+               return super.estimateInMemorySize();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
new file mode 100644
index 0000000..4db871f
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC) using 1 byte codes.
+ */
+public class ColGroupDDC1 extends ColGroupDDC 
+{
+       private static final long serialVersionUID = 5204955589230760157L;
+       
+       private byte[] _data;
+
+       public ColGroupDDC1() {
+               super();
+       }
+       
+       public ColGroupDDC1(int[] colIndices, int numRows, UncompressedBitmap 
ubm) {
+               super(colIndices, numRows, ubm);
+               _data = new byte[numRows];
+               
+               int numVals = ubm.getNumValues();
+               int numCols = ubm.getNumColumns();
+               
+               //materialize zero values, if necessary
+               if( ubm.getNumOffsets() < (long)numRows * numCols ) {
+                       int zeroIx = containsAllZeroValue();
+                       if( zeroIx < 0 ) {
+                               zeroIx = numVals;
+                               _values = Arrays.copyOf(_values, 
_values.length+numCols);
+                       }
+                       Arrays.fill(_data, (byte)zeroIx);
+               }
+               
+               //iterate over values and write dictionary codes
+               for( int i=0; i<numVals; i++ ) {
+                       int[] tmpList = ubm.getOffsetsList(i).extractValues();
+                       int tmpListSize = ubm.getNumOffsets(i); 
+                       for( int k=0; k<tmpListSize; k++ )
+                               _data[tmpList[k]] = (byte)i;
+               }
+       }
+       
+       public ColGroupDDC1(int[] colIndices, int numRows, double[] values, 
byte[] data) {
+               super(colIndices, numRows, values);
+               _data = data;
+       }
+
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.DDC1;
+       }
+       
+       @Override
+       protected double getData(int r, int colIx) {
+               return _values[(_data[r]&0xFF)*getNumCols()+colIx];
+       }
+       
+       @Override
+       protected void setData(int r, int code) {
+               _data[r] = (byte)code;
+       }
+       
+       @Override
+       public void write(DataOutput out) throws IOException {
+               int numCols = getNumCols();
+               int numVals = getNumValues();
+               out.writeInt(_numRows);
+               out.writeInt(numCols);
+               out.writeInt(numVals);
+               
+               //write col indices
+               for( int i=0; i<_colIndexes.length; i++ )
+                       out.writeInt( _colIndexes[i] );
+               
+               //write distinct values
+               for( int i=0; i<_values.length; i++ )
+                       out.writeDouble(_values[i]);
+
+               //write data
+               for( int i=0; i<_numRows; i++ )
+                       out.writeByte(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _numRows = in.readInt();
+               int numCols = in.readInt();
+               int numVals = in.readInt();
+               
+               //read col indices
+               _colIndexes = new int[ numCols ];
+               for( int i=0; i<numCols; i++ )
+                       _colIndexes[i] = in.readInt();
+               
+               //read distinct values
+               _values = new double[numVals*numCols];
+               for( int i=0; i<numVals*numCols; i++ )
+                       _values[i] = in.readDouble();
+               
+               //read data
+               _data = new byte[_numRows];
+               for( int i=0; i<_numRows; i++ )
+                       _data[i] = in.readByte();
+       }
+
+       @Override
+       public long getExactSizeOnDisk() {
+               long ret = 12; //header
+               //col indices
+               ret += 4 * _colIndexes.length; 
+               //distinct values (groups of values)
+               ret += 8 * _values.length;
+               //data
+               ret += 1 * _data.length;
+               
+               return ret;
+       }
+
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+               
+               //adding data size
+               if (_data != null)
+                       size += _data.length;
+       
+               return size;
+       }
+       
+       @Override
+       public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+               int ncol = getNumCols();
+               for( int i = rl; i < ru; i++ )
+                       for( int j=0; j<ncol; j++ )
+                               target.appendValue(i, _colIndexes[j], 
_values[(_data[i]&0xFF)*ncol+j]);
+               //note: append ok because final sort per row 
+       }
+       
+       @Override
+       protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               //pre-aggregate nnz per value tuple
+               int[] counts = new int[numVals];
+               for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol )
+                       for( int j=0; j<ncol; j++ )
+                               counts[k] += (_values[valOff+j]!=0) ? 1 : 0;
+               
+               //scan data and add counts to output rows
+               for( int i = rl; i < ru; i++ )
+                       rnnz[i-rl] += counts[_data[i]&0xFF];
+       }
+       
+       @Override
+       public void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int rl, int ru) 
+               throws DMLRuntimeException 
+       {
+               double[] b = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               //prepare reduced rhs w/ relevant values
+               double[] sb = new double[numCols];
+               for (int j = 0; j < numCols; j++) {
+                       sb[j] = b[_colIndexes[j]];
+               }
+               
+               //pre-aggregate all distinct values (guaranteed <=255)
+               double[] vals = preaggValues(numVals, sb);
+               
+               //iterative over codes and add to output
+               for( int i=rl; i<ru; i++ ) {
+                       c[i] += vals[_data[i]&0xFF];
+               }
+       }
+       
+       public static void rightMultByVector(ColGroupDDC1[] grps, MatrixBlock 
vector, MatrixBlock result, int rl, int ru) 
+               throws DMLRuntimeException 
+       {
+               double[] b = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               
+               //prepare distinct values once
+               double[][] vals = new double[grps.length][];
+               for( int i=0; i<grps.length; i++ ) {
+                       //prepare reduced rhs w/ relevant values
+                       double[] sb = new double[grps[i].getNumCols()];
+                       for (int j = 0; j < sb.length; j++) {
+                               sb[j] = b[grps[i]._colIndexes[j]];
+                       }       
+                       //pre-aggregate all distinct values (guaranteed <=255)
+                       vals[i] = grps[i].preaggValues(grps[i].getNumValues(), 
sb);
+               }
+               
+               //cache-conscious matrix-vector multiplication
+               //iterative over codes of all groups and add to output
+               int blksz = 2048; //16KB
+               for( int bi=rl; bi<ru; bi+=blksz )
+                       for( int j=0; j<grps.length; j++ )
+                               for( int i=bi; i<Math.min(bi+blksz, ru); i++ )
+                                       c[i] += vals[j][grps[j]._data[i]&0xFF];
+       }
+
+       @Override
+       public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) 
throws DMLRuntimeException {
+               double[] a = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int nrow = getNumRows();
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               if( 8*numVals < getNumRows() ) 
+               {
+                       //iterative over codes and pre-aggregate inputs per 
code (guaranteed <=255)
+                       //temporary array also avoids false sharing in 
multi-threaded environments
+                       double[] vals = new double[numVals];
+                       for( int i=0; i<nrow; i++ ) {
+                               vals[_data[i]&0xFF] += a[i];
+                       }
+                       
+                       //post-scaling of pre-aggregate with distinct values
+                       for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+                               double aval = vals[k];
+                               for( int j=0; j<ncol; j++ ) {
+                                       int colIx = _colIndexes[j];
+                                       c[colIx] += aval * _values[valOff+j];
+                               }       
+                       }
+               }
+               else //general case
+               {
+                       //iterate over codes, compute all, and add to the result
+                       for( int i=0; i<nrow; i++ ) {
+                               double aval = a[i];
+                               if( aval != 0 ) {
+                                       int valOff = (_data[i]&0xFF) * ncol;
+                                       for( int j=0; j<ncol; j++ ) {
+                                               int colIx = _colIndexes[j];
+                                               c[colIx] += aval * 
_values[valOff+j];
+                                       }
+                               }
+                       }       
+               }
+       }
+       
+       @Override
+       protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+               final int nrow = getNumRows();
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               //iterative over codes and count per code (guaranteed <=255)
+               int[] counts = new int[numVals];
+               for( int i=0; i<nrow; i++ ) {
+                       counts[_data[i]&0xFF] ++;
+               }
+               
+               //post-scaling of pre-aggregate with distinct values
+               KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), 
result.quickGetValue(0, 1));
+               for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+                       int cntk = counts[k];
+                       for( int j=0; j<ncol; j++ )
+                               kplus.execute3(kbuff, _values[ valOff+j], cntk);
+               }
+               
+               result.quickSetValue(0, 0, kbuff._sum);
+               result.quickSetValue(0, 1, kbuff._correction);
+       }
+       
+       
+       @Override
+       protected void computeRowSums(MatrixBlock result, KahanFunction kplus, 
int rl, int ru) {
+               KahanObject kbuff = new KahanObject(0, 0);
+               KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+               double[] c = result.getDenseBlock();
+               
+               //pre-aggregate nnz per value tuple
+               double[] vals = sumAllValues(kplus, kbuff);
+               
+               //scan data and add to result (use kahan plus not general 
KahanFunction
+               //for correctness in case of sqk+)
+               for( int i=rl; i<ru; i++ ) {
+                       kbuff.set(c[2*i], c[2*i+1]);
+                       kplus2.execute2(kbuff, vals[_data[i]&0xFF]);
+                       c[2*i] = kbuff._sum;
+                       c[2*i+1] = kbuff._correction;
+               }
+       }
+       
+       public static void computeRowSums(ColGroupDDC1[] grps, MatrixBlock 
result, KahanFunction kplus, int rl, int ru) 
+               throws DMLRuntimeException 
+       {
+               KahanObject kbuff = new KahanObject(0, 0);
+               KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+               double[] c = result.getDenseBlock();
+               
+               //prepare distinct values once
+               double[][] vals = new double[grps.length][];
+               for( int i=0; i<grps.length; i++ ) {
+                       //pre-aggregate all distinct values (guaranteed <=255)
+                       vals[i] = grps[i].sumAllValues(kplus, kbuff);
+               }
+               
+               //cache-conscious row sums operations 
+               //iterative over codes of all groups and add to output
+               //(use kahan plus not general KahanFunction for correctness in 
case of sqk+)
+               int blksz = 1024; //16KB
+               for( int bi=rl; bi<ru; bi+=blksz )
+                       for( int j=0; j<grps.length; j++ )
+                               for( int i=bi; i<Math.min(bi+blksz, ru); i++ ) 
{ 
+                                       kbuff.set(c[2*i], c[2*i+1]);
+                                       kplus2.execute2(kbuff, 
vals[j][grps[j]._data[i]&0xFF]);
+                                       c[2*i] = kbuff._sum;
+                                       c[2*i+1] = kbuff._correction;
+                               }
+       }
+       
+       @Override
+       public ColGroup scalarOperation(ScalarOperator op) throws 
DMLRuntimeException {
+               //fast path: sparse-safe and -unsafe operations
+               //as zero are represented, it is sufficient to simply apply the 
scalar op
+               return new ColGroupDDC1(_colIndexes, _numRows, 
applyScalarOp(op), _data);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
new file mode 100644
index 0000000..5f29979
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC) using 2 byte codes.
+ */
+public class ColGroupDDC2 extends ColGroupDDC 
+{
+       private static final long serialVersionUID = -3995768285207071013L;
+       
+       private static final int MAX_TMP_VALS = 32*1024;
+       
+       private char[] _data;
+
+       public ColGroupDDC2() {
+               super();
+       }
+       
+       public ColGroupDDC2(int[] colIndices, int numRows, UncompressedBitmap 
ubm) {
+               super(colIndices, numRows, ubm);
+               _data = new char[numRows];
+               
+               int numVals = ubm.getNumValues();
+               int numCols = ubm.getNumColumns();
+               
+               //materialize zero values, if necessary
+               if( ubm.getNumOffsets() < (long)numRows * numCols ) {
+                       int zeroIx = containsAllZeroValue();
+                       if( zeroIx < 0 ) {
+                               zeroIx = numVals;
+                               _values = Arrays.copyOf(_values, 
_values.length+numCols);
+                       }
+                       Arrays.fill(_data, (char)zeroIx);
+               }
+               
+               //iterate over values and write dictionary codes
+               for( int i=0; i<numVals; i++ ) {
+                       int[] tmpList = ubm.getOffsetsList(i).extractValues();
+                       int tmpListSize = ubm.getNumOffsets(i); 
+                       for( int k=0; k<tmpListSize; k++ )
+                               _data[tmpList[k]] = (char)i;
+               }
+       }
+       
+       public ColGroupDDC2(int[] colIndices, int numRows, double[] values, 
char[] data) {
+               super(colIndices, numRows, values);
+               _data = data;
+       }
+
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.DDC2;
+       }
+       
+       @Override
+       protected double getData(int r, int colIx) {
+               return _values[_data[r]*getNumCols()+colIx];
+       }
+
+       @Override
+       protected void setData(int r, int code) {
+               _data[r] = (char)code;
+       }
+       
+       @Override
+       public void write(DataOutput out) throws IOException {
+               int numCols = getNumCols();
+               int numVals = getNumValues();
+               out.writeInt(_numRows);
+               out.writeInt(numCols);
+               out.writeInt(numVals);
+               
+               //write col indices
+               for( int i=0; i<_colIndexes.length; i++ )
+                       out.writeInt( _colIndexes[i] );
+               
+               //write distinct values
+               for( int i=0; i<_values.length; i++ )
+                       out.writeDouble(_values[i]);
+
+               //write data
+               for( int i=0; i<_numRows; i++ )
+                       out.writeChar(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _numRows = in.readInt();
+               int numCols = in.readInt();
+               int numVals = in.readInt();
+               
+               //read col indices
+               _colIndexes = new int[ numCols ];
+               for( int i=0; i<numCols; i++ )
+                       _colIndexes[i] = in.readInt();
+               
+               //read distinct values
+               _values = new double[numVals*numCols];
+               for( int i=0; i<numVals*numCols; i++ )
+                       _values[i] = in.readDouble();
+               
+               //read data
+               _data = new char[_numRows];
+               for( int i=0; i<_numRows; i++ )
+                       _data[i] = in.readChar();
+       }
+
+       @Override
+       public long getExactSizeOnDisk() {
+               long ret = 12; //header
+               //col indices
+               ret += 4 * _colIndexes.length; 
+               //distinct values (groups of values)
+               ret += 8 * _values.length;
+               //data
+               ret += 2 * _data.length;
+               
+               return ret;
+       }
+       
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+               
+               //adding data size
+               if (_data != null)
+                       size += 2 * _data.length;
+       
+               return size;
+       }
+       
+       @Override
+       public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+               int ncol = getNumCols();
+               for( int i = rl; i < ru; i++ )
+                       for( int j=0; j<ncol; j++ )
+                               target.appendValue(i, _colIndexes[j], 
_values[_data[i]*ncol+j]);
+               //note: append ok because final sort per row 
+       }
+       
+       @Override
+       protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               //pre-aggregate nnz per value tuple
+               int[] counts = new int[numVals];
+               for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol )
+                       for( int j=0; j<ncol; j++ )
+                               counts[k] += (_values[valOff+j]!=0) ? 1 : 0;
+               
+               //scan data and add counts to output rows
+               for( int i = rl; i < ru; i++ )
+                       rnnz[i-rl] += counts[_data[i]];
+       }
+       
+       @Override
+       public void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int rl, int ru) throws DMLRuntimeException {
+               double[] b = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+
+               //prepare reduced rhs w/ relevant values
+               double[] sb = new double[numCols];
+               for (int j = 0; j < numCols; j++) {
+                       sb[j] = b[_colIndexes[j]];
+               }
+               
+               //pre-aggregate all distinct values 
+               double[] vals = preaggValues(numVals, sb);
+
+               //iterative over codes and add to output
+               for( int i=rl; i<ru; i++ )
+                       c[i] += vals[_data[i]];
+       }
+
+       @Override
+       public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) 
+               throws DMLRuntimeException 
+       {
+               double[] a = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int nrow = getNumRows();
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               if( 8*numVals < getNumRows() )
+               {
+                       //iterative over codes and pre-aggregate inputs per code
+                       //temporary array also avoids false sharing in 
multi-threaded environments
+                       double[] vals = new double[numVals];
+                       for( int i=0; i<nrow; i++ ) {
+                               vals[_data[i]] += a[i];
+                       }
+                       
+                       //post-scaling of pre-aggregate with distinct values
+                       for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+                               double aval = vals[k];
+                               for( int j=0; j<ncol; j++ ) {
+                                       int colIx = _colIndexes[j];
+                                       c[colIx] += aval * _values[valOff+j];
+                               }       
+                       }
+               }
+               else //general case
+               {
+               
+                       //iterate over codes, compute all, and add to the result
+                       for( int i=0; i<nrow; i++ ) {
+                               double aval = a[i];
+                               if( aval != 0 ) {
+                                       int valOff = _data[i] * ncol;
+                                       for( int j=0; j<ncol; j++ ) {
+                                               int colIx = _colIndexes[j];
+                                               c[colIx] += aval * 
_values[valOff+j];
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       @Override
+       protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+               final int nrow = getNumRows();
+               final int ncol = getNumCols();
+               final int numVals = getNumValues();
+               
+               if( numVals < MAX_TMP_VALS )
+               {
+                       //iterative over codes and count per code
+                       int[] counts = new int[numVals];
+                       for( int i=0; i<nrow; i++ ) {
+                               counts[_data[i]] ++;
+                       }
+                       
+                       //post-scaling of pre-aggregate with distinct values
+                       KahanObject kbuff = new 
KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+                       for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+                               int cntk = counts[k];
+                               for( int j=0; j<ncol; j++ )
+                                       kplus.execute3(kbuff, _values[ 
valOff+j], cntk);
+                       }
+                       
+                       result.quickSetValue(0, 0, kbuff._sum);
+                       result.quickSetValue(0, 1, kbuff._correction);
+               }
+               else //general case 
+               {
+                       super.computeSum(result, kplus);
+               }
+       }
+       
+       
+       @Override
+       protected void computeRowSums(MatrixBlock result, KahanFunction kplus, 
int rl, int ru) {
+               KahanObject kbuff = new KahanObject(0, 0);
+               KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+               double[] c = result.getDenseBlock();
+               
+               //pre-aggregate nnz per value tuple
+               double[] vals = sumAllValues(kplus, kbuff);
+               
+               //scan data and add to result (use kahan plus not general 
KahanFunction
+               //for correctness in case of sqk+)
+               for( int i=rl; i<ru; i++ ) {
+                       kbuff.set(c[2*i], c[2*i+1]);
+                       kplus2.execute2(kbuff, vals[_data[i]]);
+                       c[2*i] = kbuff._sum;
+                       c[2*i+1] = kbuff._correction;
+               }
+       }
+       
+       @Override
+       public ColGroup scalarOperation(ScalarOperator op) throws 
DMLRuntimeException {
+               //fast path: sparse-safe and -unsafe operations
+               //as zero are represented, it is sufficient to simply apply the 
scalar op
+               return new ColGroupDDC2(_colIndexes, _numRows, 
applyScalarOp(op), _data);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index 696adf2..f47a432 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -22,20 +22,16 @@ package org.apache.sysml.runtime.compress;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
 import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 
 /**
@@ -43,12 +39,14 @@ import 
org.apache.sysml.runtime.matrix.operators.ScalarOperator;
  * simple lists of offsets for each set of distinct values.
  * 
  */
-public class ColGroupOLE extends ColGroupBitmap 
+public class ColGroupOLE extends ColGroupOffset 
 {
        private static final long serialVersionUID = -9157676271360528008L;
 
+       private static final Log LOG = 
LogFactory.getLog(ColGroupOLE.class.getName());
+       
        public ColGroupOLE() {
-               super(CompressionType.OLE_BITMAP);
+               super();
        }
        
        /**
@@ -64,14 +62,15 @@ public class ColGroupOLE extends ColGroupBitmap
         */
        public ColGroupOLE(int[] colIndices, int numRows, UncompressedBitmap 
ubm) 
        {
-               super(CompressionType.OLE_BITMAP, colIndices, numRows, ubm);
+               super(colIndices, numRows, ubm);
 
                // compress the bitmaps
                final int numVals = ubm.getNumValues();
                char[][] lbitmaps = new char[numVals][];
                int totalLen = 0;
                for( int i=0; i<numVals; i++ ) {
-                       lbitmaps[i] = 
BitmapEncoder.genOffsetBitmap(ubm.getOffsetsList(i));
+                       lbitmaps[i] = BitmapEncoder.genOffsetBitmap(
+                               ubm.getOffsetsList(i).extractValues(), 
ubm.getNumOffsets(i));
                        totalLen += lbitmaps[i].length;
                }
                
@@ -95,13 +94,24 @@ public class ColGroupOLE extends ColGroupBitmap
                                _skiplist[k] = bix;
                        }               
                }
+               
+               //debug output
+               double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, 
colIndices.length);
+               if( estimateInMemorySize() > ucSize )
+                       LOG.warn("OLE group larger than UC dense: 
"+estimateInMemorySize()+" "+ucSize);
        }
 
        public ColGroupOLE(int[] colIndices, int numRows, boolean zeros, 
double[] values, char[] bitmaps, int[] bitmapOffs) {
-               super(CompressionType.OLE_BITMAP, colIndices, numRows, zeros, 
values);
+               super(colIndices, numRows, zeros, values);
                _data = bitmaps;
                _ptr = bitmapOffs;
        }
+       
+
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.OLE_BITMAP;
+       }
 
        @Override
        public Iterator<Integer> getDecodeIterator(int k) {
@@ -251,7 +261,7 @@ public class ColGroupOLE extends ColGroupBitmap
                }
                
                double[] rvalues = applyScalarOp(op, val0, getNumCols());       
        
-               char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff);
+               char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff, 
loff.length);
                char[] rbitmaps = Arrays.copyOf(_data, 
_data.length+lbitmap.length);
                System.arraycopy(lbitmap, 0, rbitmaps, _data.length, 
lbitmap.length);
                int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
@@ -284,7 +294,7 @@ public class ColGroupOLE extends ColGroupBitmap
                        //best configuration aligns with L3 cache size 
(x*vcores*64K*8B < L3)
                        //x=4 leads to a good yet slightly conservative 
compromise for single-/
                        //multi-threaded and typical number of cores and L3 
cache sizes
-                       final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+                       final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ;
                        
                        //step 1: prepare position and value arrays
                        int[] apos = skipScan(numVals, rl);
@@ -365,7 +375,7 @@ public class ColGroupOLE extends ColGroupBitmap
                if( LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz )
                {
                        //cache blocking config (see matrix-vector mult for 
explanation)
-                       final int blksz2 = ColGroupBitmap.READ_CACHE_BLKSZ;
+                       final int blksz2 = ColGroupOffset.READ_CACHE_BLKSZ;
                        
                        //step 1: prepare position and value arrays
                        
@@ -426,46 +436,7 @@ public class ColGroupOLE extends ColGroupBitmap
        }
 
        @Override
-       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result) 
-               throws DMLRuntimeException 
-       {
-               unaryAggregateOperations(op, result, 0, getNumRows());
-       }
-       
-       @Override
-       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result, int rl, int ru) 
-               throws DMLRuntimeException 
-       {
-               //sum and sumsq (reduceall/reducerow over tuples and counts)
-               if( op.aggOp.increOp.fn instanceof KahanPlus || 
op.aggOp.increOp.fn instanceof KahanPlusSq ) 
-               {
-                       KahanFunction kplus = (op.aggOp.increOp.fn instanceof 
KahanPlus) ?
-                                       KahanPlus.getKahanPlusFnObject() : 
KahanPlusSq.getKahanPlusSqFnObject();
-                       
-                       if( op.indexFn instanceof ReduceAll )
-                               computeSum(result, kplus);
-                       else if( op.indexFn instanceof ReduceCol )
-                               computeRowSums(result, kplus, rl, ru);
-                       else if( op.indexFn instanceof ReduceRow )
-                               computeColSums(result, kplus);
-               }
-               //min and max (reduceall/reducerow over tuples only)
-               else if(op.aggOp.increOp.fn instanceof Builtin 
-                               && 
(((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
-                               || 
((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
-               {               
-                       Builtin builtin = (Builtin) op.aggOp.increOp.fn;
-
-                       if( op.indexFn instanceof ReduceAll )
-                               computeMxx(result, builtin);
-                       else if( op.indexFn instanceof ReduceCol )
-                               computeRowMxx(result, builtin, rl, ru);
-                       else if( op.indexFn instanceof ReduceRow )
-                               computeColMxx(result, builtin);
-               }
-       }
-
-       private void computeSum(MatrixBlock result, KahanFunction kplus)
+       protected final void computeSum(MatrixBlock result, KahanFunction kplus)
        {
                KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), 
result.quickGetValue(0, 1));
                
@@ -493,41 +464,88 @@ public class ColGroupOLE extends ColGroupBitmap
                result.quickSetValue(0, 1, kbuff._correction);
        }
 
-       private void computeRowSums(MatrixBlock result, KahanFunction kplus, 
int rl, int ru)
+       @Override
+       protected final void computeRowSums(MatrixBlock result, KahanFunction 
kplus, int rl, int ru)
        {
                KahanObject kbuff = new KahanObject(0, 0);
-       
+               KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+               
                final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
                final int numVals = getNumValues();
                double[] c = result.getDenseBlock();
                
-               //iterate over all values and their bitmaps
-               for (int k = 0; k < numVals; k++) 
+               if( ALLOW_CACHE_CONSCIOUS_ROWSUMS &&
+                       LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz )
                {
-                       //prepare value-to-add for entire value bitmap
-                       int boff = _ptr[k];
-                       int blen = len(k);
-                       double val = sumValues(k);
+                       final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ/2;
                        
-                       //iterate over bitmap blocks and add values
-                       if (val != 0) {
-                               int slen;
-                               int bix = skipScanVal(k, rl);
-                               for( int off=bix*blksz; bix<blen && off<ru; 
bix+=slen+1, off+=blksz ) {
-                                       slen = _data[boff+bix];
-                                       for (int i = 1; i <= slen; i++) {
-                                               int rix = off + _data[boff+bix 
+ i];
-                                               kbuff.set(c[2*rix], c[2*rix+1]);
-                                               kplus.execute2(kbuff, val);
-                                               c[2*rix] = kbuff._sum;
-                                               c[2*rix+1] = kbuff._correction;
+                       //step 1: prepare position and value arrays
+                       int[] apos = skipScan(numVals, rl);
+                       double[] aval = sumAllValues(kplus, kbuff);
+                                       
+                       //step 2: cache conscious row sums via horizontal scans 
+                       for( int bi=rl; bi<ru; bi+=blksz2 ) 
+                       {
+                               int bimax = Math.min(bi+blksz2, ru);
+                               
+                               //horizontal segment scan, incl pos maintenance
+                               for (int k = 0; k < numVals; k++) {
+                                       int boff = _ptr[k];
+                                       int blen = len(k);
+                                       double val = aval[k];
+                                       int bix = apos[k];
+                                       
+                                       for( int ii=bi; ii<bimax && bix<blen; 
ii+=blksz ) {
+                                               //prepare length, start, and 
end pos
+                                               int len = _data[boff+bix];
+                                               int pos = boff+bix+1;
+                                               
+                                               //compute partial results
+                                               for (int i = 0; i < len; i++) {
+                                                       int rix = ii + 
_data[pos + i];
+                                                       kbuff.set(c[2*rix], 
c[2*rix+1]);
+                                                       kplus2.execute2(kbuff, 
val);
+                                                       c[2*rix] = kbuff._sum;
+                                                       c[2*rix+1] = 
kbuff._correction;
+                                               }
+                                               bix += len + 1;
+                                       }
+
+                                       apos[k] = bix;
+                               }
+                       }               
+               }
+               else
+               {
+                       //iterate over all values and their bitmaps
+                       for (int k = 0; k < numVals; k++) 
+                       {
+                               //prepare value-to-add for entire value bitmap
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               double val = sumValues(k, kplus, kbuff);
+                               
+                               //iterate over bitmap blocks and add values
+                               if (val != 0) {
+                                       int slen;
+                                       int bix = skipScanVal(k, rl);
+                                       for( int off=((rl+1)/blksz)*blksz; 
bix<blen && off<ru; bix+=slen+1, off+=blksz ) {
+                                               slen = _data[boff+bix];
+                                               for (int i = 1; i <= slen; i++) 
{
+                                                       int rix = off + 
_data[boff+bix + i];
+                                                       kbuff.set(c[2*rix], 
c[2*rix+1]);
+                                                       kplus2.execute2(kbuff, 
val);
+                                                       c[2*rix] = kbuff._sum;
+                                                       c[2*rix+1] = 
kbuff._correction;
+                                               }
                                        }
                                }
                        }
                }
        }
 
-       private void computeColSums(MatrixBlock result, KahanFunction kplus)
+       @Override
+       protected final void computeColSums(MatrixBlock result, KahanFunction 
kplus)
        {
                KahanObject kbuff = new KahanObject(0, 0);
                
@@ -555,7 +573,8 @@ public class ColGroupOLE extends ColGroupBitmap
                }
        }
 
-       private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, 
int ru)
+       @Override
+       protected final void computeRowMxx(MatrixBlock result, Builtin builtin, 
int rl, int ru)
        {
                //NOTE: zeros handled once for all column groups outside
                final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -624,7 +643,7 @@ public class ColGroupOLE extends ColGroupBitmap
        protected void countNonZerosPerRow(int[] rnnz, int rl, int ru)
        {
                final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
-               final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+               final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ;
                final int numVals = getNumValues();
                final int numCols = getNumCols();
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
new file mode 100644
index 0000000..e49c1a3
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+
+
+/**
+ * Base class for column groups encoded with various types of bitmap encoding.
+ * 
+ * 
+ * NOTES:
+ *  * OLE: separate storage segment length and bitmaps led to a 30% improvement
+ *    but not applied because more difficult to support both data layouts at 
the
+ *    same time (distributed/local as well as w/ and w/o low-level opt)
+ */
+public abstract class ColGroupOffset extends ColGroupValue 
+{
+       private static final long serialVersionUID = -1635828933479403125L;
+
+       protected static final boolean CREATE_SKIPLIST = true;
+       
+       protected static final int READ_CACHE_BLKSZ = 2 * 
BitmapEncoder.BITMAP_BLOCK_SZ;
+       public static final int WRITE_CACHE_BLKSZ = 2 * 
BitmapEncoder.BITMAP_BLOCK_SZ;
+       public static boolean ALLOW_CACHE_CONSCIOUS_ROWSUMS = true;
+       
+       /** Bitmaps, one per uncompressed value in {@link #_values}. */
+       protected int[] _ptr; //bitmap offsets per value
+       protected char[] _data; //linearized bitmaps (variable length)
+       protected boolean _zeros; //contains zero values
+       
+       protected int[] _skiplist;
+       
+       public ColGroupOffset() {
+               super();
+       }
+       
+       /**
+        * Main constructor. Stores the headers for the individual bitmaps.
+        * 
+        * @param colIndices
+        *            indices (within the block) of the columns included in this
+        *            column
+        * @param numRows
+        *            total number of rows in the parent block
+        * @param ubm
+        *            Uncompressed bitmap representation of the block
+        */
+       public ColGroupOffset(int[] colIndices, int numRows, UncompressedBitmap 
ubm) {
+               super(colIndices, numRows, ubm);
+               _zeros = (ubm.getNumOffsets() < numRows);
+       }
+
+       /**
+        * Constructor for subclass methods that need to create shallow copies
+        * 
+        * @param type compression type
+        * @param colIndices
+        *            raw column index information
+        * @param numRows
+        *            number of rows in the block
+        * @param zeros ?
+        * @param values
+        *            set of distinct values for the block (associated bitmaps 
are
+        *            kept in the subclass)
+        */
+       protected ColGroupOffset(int[] colIndices, int numRows, boolean zeros, 
double[] values) {
+               super(colIndices, numRows, values);
+               _zeros = zeros;
+       }
+       
+       protected final int len(int k) {
+               return _ptr[k+1] - _ptr[k];
+       }
+
+       protected void createCompressedBitmaps(int numVals, int totalLen, 
char[][] lbitmaps) {
+               // compact bitmaps to linearized representation
+               _ptr = new int[numVals+1];
+               _data = new char[totalLen];
+               for( int i=0, off=0; i<numVals; i++ ) {
+                       int len = lbitmaps[i].length;
+                       _ptr[i] = off;
+                       System.arraycopy(lbitmaps[i], 0, _data, off, len);
+                       off += len;
+               }
+               _ptr[numVals] = totalLen;
+       }
+       
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+               
+               // adding bitmaps size
+               size += 16; //array references
+               if (_data != null) {
+                       size += 32 + _ptr.length * 4; // offsets
+                       size += 32 + _data.length * 2;    // bitmaps
+               }
+       
+               return size;
+       }
+
+       //generic decompression for OLE/RLE, to be overwritten for performance
+       @Override
+       public void decompressToBlock(MatrixBlock target, int rl, int ru) 
+       {
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               int[] colIndices = getColIndices();
+               
+               // Run through the bitmaps for this column group
+               for (int i = 0; i < numVals; i++) {
+                       Iterator<Integer> decoder = getDecodeIterator(i);
+                       int valOff = i*numCols;
+
+                       while (decoder.hasNext()) {
+                               int row = decoder.next();
+                               if( row<rl ) continue;
+                               if( row>ru ) break;
+                               
+                               for (int colIx = 0; colIx < numCols; colIx++)
+                                       target.appendValue(row, 
colIndices[colIx], _values[valOff+colIx]);
+                       }
+               }
+       }
+
+       //generic decompression for OLE/RLE, to be overwritten for performance
+       @Override
+       public void decompressToBlock(MatrixBlock target, int[] 
colIndexTargets) 
+       {
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               // Run through the bitmaps for this column group
+               for (int i = 0; i < numVals; i++) {
+                       Iterator<Integer> decoder = getDecodeIterator(i);
+                       int valOff = i*numCols;
+
+                       while (decoder.hasNext()) {
+                               int row = decoder.next();
+                               for (int colIx = 0; colIx < numCols; colIx++) {
+                                       int origMatrixColIx = 
getColIndex(colIx);
+                                       int targetColIx = 
colIndexTargets[origMatrixColIx];
+                                       target.quickSetValue(row, targetColIx, 
_values[valOff+colIx]);
+                               }
+                       }
+               }
+       }
+       
+       //generic decompression for OLE/RLE, to be overwritten for performance
+       @Override
+       public void decompressToBlock(MatrixBlock target, int colpos) 
+       {
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               // Run through the bitmaps for this column group
+               for (int i = 0; i < numVals; i++) {
+                       Iterator<Integer> decoder = getDecodeIterator(i);
+                       int valOff = i*numCols;
+
+                       while (decoder.hasNext()) {
+                               int row = decoder.next();
+                               target.quickSetValue(row, 0, 
_values[valOff+colpos]);
+                       }
+               }
+       }
+
+       //generic get for OLE/RLE, to be overwritten for performance
+       //potential: skip scan (segment length agg and run length) instead of 
decode
+       @Override
+       public double get(int r, int c) {
+               //find local column index
+               int ix = Arrays.binarySearch(_colIndexes, c);
+               if( ix < 0 )
+                       throw new RuntimeException("Column index "+c+" not in 
bitmap group.");
+               
+               //find row index in value offset lists via scan
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               for (int i = 0; i < numVals; i++) {
+                       Iterator<Integer> decoder = getDecodeIterator(i);
+                       int valOff = i*numCols;
+                       while (decoder.hasNext()) {
+                               int row = decoder.next();
+                               if( row == r )
+                                       return _values[valOff+ix];
+                               else if( row > r )
+                                       break; //current value
+                       }
+               }               
+               return 0;
+       }
+
+       protected final void sumAllValues(double[] b, double[] c)
+       {
+               final int numVals = getNumValues();
+               final int numCols = getNumCols();
+               
+               //vectMultiplyAdd over cols instead of dotProduct over vals 
because
+               //usually more values than columns
+               for( int i=0, off=0; i<numCols; i++, off+=numVals )
+                       LinearAlgebraUtils.vectMultiplyAdd(b[i], _values, c, 
off, 0, numVals);
+       }
+
+       protected final double mxxValues(int bitmapIx, Builtin builtin)
+       {
+               final int numCols = getNumCols();
+               final int valOff = bitmapIx * numCols;
+               
+               double val = Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
+               for( int i = 0; i < numCols; i++ )
+                       val = builtin.execute2(val, _values[valOff+i]);
+               
+               return val;
+       }
+
+       public char[] getBitmaps() {
+               return _data;
+       }
+       
+       public int[] getBitmapOffsets() {
+               return _ptr;
+       }
+
+       public boolean hasZeros() {
+               return _zeros;
+       }
+       
+       /**
+        * @param k
+        *            index of a specific compressed bitmap (stored in subclass,
+        *            index same as {@link #getValues})
+        * @return an object for iterating over the row offsets in this bitmap. 
Only
+        *         valid until the next call to this method. May be reused 
across
+        *         calls.
+        */
+       public abstract Iterator<Integer> getDecodeIterator(int k);
+
+       //TODO getDecodeIterator(int k, int rl, int ru)
+
+       /**
+        * Utility function of sparse-unsafe operations.
+        * 
+        * @param ind row indicator vector of non zeros
+        * @return offsets
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       protected int[] computeOffsets(boolean[] ind)
+               throws DMLRuntimeException 
+       {
+               //determine number of offsets
+               int numOffsets = 0;
+               for( int i=0; i<ind.length; i++ )
+                       numOffsets += ind[i] ? 1 : 0;
+               
+               //create offset lists
+               int[] ret = new int[numOffsets];
+               for( int i=0, pos=0; i<ind.length; i++ )
+                       if( ind[i] )
+                               ret[pos++] = i;
+               
+               return ret;
+       }
+
+       @Override
+       public void readFields(DataInput in) 
+               throws IOException 
+       {
+               _numRows = in.readInt();
+               int numCols = in.readInt();
+               int numVals = in.readInt();
+               _zeros = in.readBoolean();
+               
+               //read col indices
+               _colIndexes = new int[ numCols ];
+               for( int i=0; i<numCols; i++ )
+                       _colIndexes[i] = in.readInt();
+               
+               //read distinct values
+               _values = new double[numVals*numCols];
+               for( int i=0; i<numVals*numCols; i++ )
+                       _values[i] = in.readDouble();
+               
+               //read bitmaps
+               int totalLen = in.readInt();
+               _ptr = new int[numVals+1];
+               _data = new char[totalLen];             
+               for( int i=0, off=0; i<numVals; i++ ) {
+                       int len = in.readInt();
+                       _ptr[i] = off;
+                       for( int j=0; j<len; j++ )
+                               _data[off+j] = in.readChar();
+                       off += len;
+               }
+               _ptr[numVals] = totalLen;
+       }
+       
+       @Override
+       public void write(DataOutput out) 
+               throws IOException 
+       {
+               int numCols = getNumCols();
+               int numVals = getNumValues();
+               out.writeInt(_numRows);
+               out.writeInt(numCols);
+               out.writeInt(numVals);
+               out.writeBoolean(_zeros);
+               
+               //write col indices
+               for( int i=0; i<_colIndexes.length; i++ )
+                       out.writeInt( _colIndexes[i] );
+               
+               //write distinct values
+               for( int i=0; i<_values.length; i++ )
+                       out.writeDouble(_values[i]);
+
+               //write bitmaps (lens and data, offset later recreated)
+               int totalLen = 0;
+               for( int i=0; i<numVals; i++ )
+                       totalLen += len(i);
+               out.writeInt(totalLen); 
+               for( int i=0; i<numVals; i++ ) {
+                       int len = len(i);
+                       int off = _ptr[i];
+                       out.writeInt(len);
+                       for( int j=0; j<len; j++ )
+                               out.writeChar(_data[off+j]);
+               }
+       }
+
+       @Override
+       public long getExactSizeOnDisk() {
+               long ret = 13; //header
+               //col indices
+               ret += 4 * _colIndexes.length; 
+               //distinct values (groups of values)
+               ret += 8 * _values.length;
+               //actual bitmaps
+               ret += 4; //total length
+               for( int i=0; i<getNumValues(); i++ )
+                       ret += 4 + 2 * len(i);
+               
+               return ret;
+       }
+       
+
+       
+       @Override
+       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result, int rl, int ru) 
+               throws DMLRuntimeException 
+       {
+               //sum and sumsq (reduceall/reducerow over tuples and counts)
+               if( op.aggOp.increOp.fn instanceof KahanPlus || 
op.aggOp.increOp.fn instanceof KahanPlusSq ) 
+               {
+                       KahanFunction kplus = (op.aggOp.increOp.fn instanceof 
KahanPlus) ?
+                                       KahanPlus.getKahanPlusFnObject() : 
KahanPlusSq.getKahanPlusSqFnObject();
+                       
+                       if( op.indexFn instanceof ReduceAll )
+                               computeSum(result, kplus);
+                       else if( op.indexFn instanceof ReduceCol )
+                               computeRowSums(result, kplus, rl, ru);
+                       else if( op.indexFn instanceof ReduceRow )
+                               computeColSums(result, kplus);
+               }
+               //min and max (reduceall/reducerow over tuples only)
+               else if(op.aggOp.increOp.fn instanceof Builtin 
+                               && 
(((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
+                               || 
((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
+               {               
+                       Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+
+                       if( op.indexFn instanceof ReduceAll )
+                               computeMxx(result, builtin, _zeros);
+                       else if( op.indexFn instanceof ReduceCol )
+                               computeRowMxx(result, builtin, rl, ru);
+                       else if( op.indexFn instanceof ReduceRow )
+                               computeColMxx(result, builtin, _zeros);
+               }
+       }
+       
+       protected abstract void computeSum(MatrixBlock result, KahanFunction 
kplus);
+       
+       protected abstract void computeRowSums(MatrixBlock result, 
KahanFunction kplus, int rl, int ru);
+       
+       protected abstract void computeColSums(MatrixBlock result, 
KahanFunction kplus);
+       
+       protected abstract void computeRowMxx(MatrixBlock result, Builtin 
builtin, int rl, int ru);
+       
+}

Reply via email to