Repository: systemml
Updated Branches:
  refs/heads/master 622d36c4a -> b00a66536


[SYSTEMML-2164] Improved serialization of compressed matrix blocks

Compressed matrix blocks have the option to share common dictionaries of
distinct values per column group across groups in DDC1 encoding format
(dense dictionary coding, 1byte aligned). This feature is very useful
for small block sizes (e.g., default 1K) and common value domains as is
often the case for image data. However, so far each group redundantly
serialized/deserialized the common dictionary with subsequent recovery
of the in-memory shared dictionary. 

This patch improves this code path to serialize (and thus deserialize)
the common dictionary just once, which reduces - for large datasets that
exceed aggregate cluster memory - the size on disk and related I/O
costs, as well as deserialization and GC overheads.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b00a6653
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b00a6653
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b00a6653

Branch: refs/heads/master
Commit: b00a6653612132c8793cb00bb8e05a9388fcaff5
Parents: 622d36c
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 1 22:41:47 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 1 22:41:47 2018 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/compress/ColGroup.java | 23 ++++++++++++++++-
 .../sysml/runtime/compress/ColGroupDDC1.java    | 26 +++++++++++++++-----
 .../runtime/compress/CompressedMatrixBlock.java | 11 ++++++---
 3 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b00a6653/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index 5097aec..1938d17 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -190,6 +190,17 @@ public abstract class ColGroup implements Serializable
                throws IOException;
        
        /**
+        * Serializes column group to data output.
+        * 
+        * @param out data output
+        * @param skipDict skip shared dictionary
+        * @throws IOException if IOException occurs
+        */
+       public void write(DataOutput out, boolean skipDict) throws IOException {
+               write(out); //skipDict ignored by default
+       }
+       
+       /**
         * Deserializes column group from data input.
         * 
         * @param in data input
@@ -197,7 +208,17 @@ public abstract class ColGroup implements Serializable
         */
        public abstract void readFields(DataInput in) 
                throws IOException;
-               
+       
+       /**
+        * Deserializes column group from data input.
+        * 
+        * @param in data input
+        * @param skipDict skip shared dictionary
+        * @throws IOException if IOException occurs
+        */
+       public void readFields(DataInput in, boolean skipDict) throws 
IOException {
+               readFields(in); //skipDict ignored by default
+       }
        
        /**
         * Returns the exact serialized size of column group.

http://git-wip-us.apache.org/repos/asf/systemml/blob/b00a6653/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index 565aa8f..1861bba 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -118,6 +118,11 @@ public class ColGroupDDC1 extends ColGroupDDC
        
        @Override
        public void write(DataOutput out) throws IOException {
+               write(out, false);
+       }
+       
+       @Override
+       public void write(DataOutput out, boolean skipDict) throws IOException {
                int numCols = getNumCols();
                int numVals = getNumValues();
                out.writeInt(_numRows);
@@ -129,9 +134,11 @@ public class ColGroupDDC1 extends ColGroupDDC
                        out.writeInt( _colIndexes[i] );
                
                //write distinct values
-               for( int i=0; i<_values.length; i++ )
-                       out.writeDouble(_values[i]);
-
+               if( !skipDict ) {
+                       for( int i=0; i<_values.length; i++ )
+                               out.writeDouble(_values[i]);
+               }
+               
                //write data
                for( int i=0; i<_numRows; i++ )
                        out.writeByte(_data[i]);
@@ -139,6 +146,11 @@ public class ColGroupDDC1 extends ColGroupDDC
 
        @Override
        public void readFields(DataInput in) throws IOException {
+               readFields(in, false);
+       }
+       
+       @Override
+       public void readFields(DataInput in, boolean skipDict) throws 
IOException {
                _numRows = in.readInt();
                int numCols = in.readInt();
                int numVals = in.readInt();
@@ -149,9 +161,11 @@ public class ColGroupDDC1 extends ColGroupDDC
                        _colIndexes[i] = in.readInt();
                
                //read distinct values
-               _values = new double[numVals*numCols];
-               for( int i=0; i<numVals*numCols; i++ )
-                       _values[i] = in.readDouble();
+               if( !skipDict || numCols!=1 ) {
+                       _values = new double[numVals*numCols];
+                       for( int i=0; i<numVals*numCols; i++ )
+                               _values[i] = in.readDouble();
+               }
                
                //read data
                _data = new byte[_numRows];

http://git-wip-us.apache.org/repos/asf/systemml/blob/b00a6653/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 29c95e4..782bbd2 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -840,8 +840,9 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                        grp = new ColGroupDDC2(); break;
                        }
                        
-                       //deserialize and add column group
-                       grp.readFields(in);
+                       //deserialize and add column group (flag for shared 
dictionary passed
+                       //and numCols evaluated in DDC1 because numCols not 
available yet
+                       grp.readFields(in, sharedDict!=null);
                        
                        //use shared DDC1 dictionary if applicable
                        if( _sharedDDC1Dict && grp.getNumCols()==1
@@ -875,9 +876,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                out.writeBoolean(_sharedDDC1Dict);
                out.writeInt(_colGroups.size());
                
+               boolean skipDict = false;
                for( ColGroup grp : _colGroups ) {
+                       boolean shared = (grp instanceof ColGroupDDC1
+                               && _sharedDDC1Dict && grp.getNumCols()==1);
                        out.writeByte( grp.getCompType().ordinal() );
-                       grp.write(out); //delegate serialization
+                       grp.write(out, skipDict & shared); //delegate 
serialization
+                       skipDict |= shared;
                }
        }
        

Reply via email to