Repository: incubator-systemml
Updated Branches:
  refs/heads/master 61a6dcb08 -> 4bc6601d6


[SYSTEMML-925] Performance shuffle-based frame converters/operations

This patch includes various performance improvements for wide-matrix to
frame casting, some of which generalize to all shuffle-based
converters/operations. In detail, this includes:

(1) Improved mergeByKey primitive: Similar to matrices, we now use
combineByKey for in-place block merges. This also includes a clean
separation of frame aggregation utils.

(2) Schema handling matrix-frame conversion: The multi-block
matrix-frame conversion also created a string schema, which was horrible
in terms of memory-efficiency leading to huge shuffle and spilling. We
now create proper double frame blocks.

(3) Multi-block matrix-frame conversion: The existing matrix-frame
conversion created unnecessarily large temporary objects - we now
reduced the size of intermediates (however, the number of copied cells
is still equivalent and has further potential for improvement).

(4) Frame block merge: The current frame block merge always used a slow
path through string conversions. If the column schemas match, we now use
a new copy non-zeros primitive directly over the native arrays.

Overall, these improvements brought the runtime for a 500k x 10k
scenario from 20+min to 200s. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4bc6601d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4bc6601d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4bc6601d

Branch: refs/heads/master
Commit: 4bc6601d69d08d01a895d06cbb7fd8cd4e6c6cbf
Parents: 61a6dcb
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Wed Sep 21 22:08:34 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Wed Sep 21 22:24:12 2016 -0700

----------------------------------------------------------------------
 .../spark/FrameAppendRSPInstruction.java        |   6 +-
 .../spark/FrameIndexingSPInstruction.java       |   6 +-
 .../spark/utils/FrameRDDAggregateUtils.java     |  91 ++++++++++++++++
 .../spark/utils/FrameRDDConverterUtils.java     | 107 ++++++-------------
 .../spark/utils/RDDAggregateUtils.java          |  40 -------
 .../sysml/runtime/matrix/data/FrameBlock.java   |  43 +++++++-
 6 files changed, 170 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index ad6ef58..58167b4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -30,7 +30,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
@@ -53,9 +53,9 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction
                
                if(_cbind) {
                        JavaPairRDD<Long,FrameBlock> in1Aligned = 
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));                   
-                       in1Aligned = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in1Aligned);                     
+                       in1Aligned = 
FrameRDDAggregateUtils.mergeByKey(in1Aligned);                     
                        JavaPairRDD<Long,FrameBlock> in2Aligned = 
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
-                       in2Aligned = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in2Aligned);                     
+                       in2Aligned = 
FrameRDDAggregateUtils.mergeByKey(in2Aligned);                     
                        
                        out = in1Aligned.join(in2Aligned).mapValues(new 
ReduceSideColumnsFunction(_cbind));
                } else {        //rbind

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index b4556cf..98b8f70 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -36,7 +36,7 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import 
org.apache.sysml.runtime.instructions.spark.functions.IsFrameBlockInRange;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -111,7 +111,7 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                                
                                //aggregation if required 
                                if( _aggType != SparkAggType.NONE )
-                                       out = 
RDDAggregateUtils.mergeByFrameKey(out);
+                                       out = 
FrameRDDAggregateUtils.mergeByKey(out);
                        }
                        
                        //put output RDD handle into symbol table
@@ -164,7 +164,7 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                                in2 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() )
                                            .flatMapToPair(new 
SliceRHSForLeftIndexing(ixrange, mcLeft));
 
-                               out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in1.union(in2));
+                               out = 
FrameRDDAggregateUtils.mergeByKey(in1.union(in2));
                        }
                        
                        sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
new file mode 100644
index 0000000..e57e599
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+
+
+public class FrameRDDAggregateUtils 
+{
+       /**
+        * @param: in
+        * @return: 
+        */
+       public static JavaPairRDD<Long, FrameBlock> mergeByKey( 
JavaPairRDD<Long, FrameBlock> in )
+       {
+               //use combine by key to avoid unnecessary deep block copies, 
i.e.
+               //create combiner block once and merge remaining blocks 
in-place.
+               return in.combineByKey( 
+                               new CreateBlockCombinerFunction(), 
+                           new MergeBlocksFunction(false), 
+                           new MergeBlocksFunction(false) );
+       }
+       
+       /**
+        * 
+        */
+       private static class CreateBlockCombinerFunction implements 
Function<FrameBlock, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-4445167244905540494L;
+
+               @Override
+               public FrameBlock call(FrameBlock arg0) 
+                       throws Exception 
+               {
+                       //create deep copy of given block
+                       return new FrameBlock(arg0);
+               }       
+       }
+       
+       /**
+        * 
+        */
+       private static class MergeBlocksFunction implements 
Function2<FrameBlock, FrameBlock, FrameBlock> 
+       {               
+               private static final long serialVersionUID = 
7807210434431147007L;
+               
+               private boolean _deep = false;
+               
+               public MergeBlocksFunction(boolean deep) {
+                       _deep = deep;
+               }
+
+               @Override
+               public FrameBlock call(FrameBlock b1, FrameBlock b2) 
+                       throws Exception 
+               {
+                       // sanity check input dimensions
+                       if (b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
+                               throw new DMLRuntimeException("Mismatched frame 
block sizes for: "
+                                               + b1.getNumRows() + " " + 
b1.getNumColumns() + " "
+                                               + b2.getNumRows() + " " + 
b2.getNumColumns());
+                       }
+
+                       // execute merge (never pass by reference)
+                       FrameBlock ret = _deep ? new FrameBlock(b1) : b1;
+                       ret.merge(b2);
+                       return ret;
+               }
+       }       
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 351d559..2bcfbc4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -171,10 +171,8 @@ public class FrameRDDConverterUtils
                }
                
                //convert binary block to csv (from blocks/rows)
-               JavaRDD<String> out = input
-                               .flatMap(new BinaryBlockToCSVFunction(props));
-       
-               return out;
+               return input.flatMap(
+                               new BinaryBlockToCSVFunction(props));
        }
        
        
@@ -226,10 +224,7 @@ public class FrameRDDConverterUtils
                                .mapPartitionsToPair(new 
TextToBinaryBlockFunction( mc, schema ));
                
                //aggregate partial matrix blocks
-               JavaPairRDD<Long,FrameBlock> out = 
-                               (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( output ); 
-
-               return out;
+               return FrameRDDAggregateUtils.mergeByKey( output ); 
        }
 
        /**
@@ -287,7 +282,7 @@ public class FrameRDDConverterUtils
                        out = input.flatMapToPair(new 
MatrixToBinaryBlockFunction(mcIn));
                        
                        //aggregate partial frame blocks
-                       out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( out );
+                       out = FrameRDDAggregateUtils.mergeByKey( out );
                }
                else {
                        //convert single matrix binary block to frame binary 
block (w/o shuffle)
@@ -315,9 +310,7 @@ public class FrameRDDConverterUtils
                                .flatMapToPair(new 
BinaryBlockToMatrixBlockFunction(mcIn, mcOut));
        
                //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out );      
-               
-               return out;
+               return RDDAggregateUtils.mergeByKey( out );     
        }
        
        //=====================================
@@ -357,10 +350,8 @@ public class FrameRDDConverterUtils
                convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, 
containsID);
                                
                //convert rdd to binary block rdd
-               JavaPairRDD<Long, FrameBlock> out = 
prepinput.mapPartitionsToPair(
+               return prepinput.mapPartitionsToPair(
                                new DataFrameToBinaryBlockFunction(mc, 
colnames, fschema, containsID));
-               
-               return out;
        }
 
        /**
@@ -1011,85 +1002,57 @@ public class FrameRDDConverterUtils
                        throws Exception 
                {
                        ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
-
-                       MatrixIndexes matrixIndexes = arg0._1();
-                       MatrixBlock matrixBlock = arg0._2();
-                       
-                       //Frame Index (Row id, with base 1)
-                       Long rowix = new 
Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
-
-                       //Global index within frame blocks
-                       long colixLow = 
(int)((matrixIndexes.getColumnIndex()-1)*_bclen+1);
-                       long colixHigh = 
Math.min(colixLow+matrixBlock.getNumColumns()-1, _clen);
+                       MatrixIndexes ix = arg0._1();
+                       MatrixBlock mb = arg0._2();
+                       MatrixBlock mbreuse = new MatrixBlock();
                        
-                       //Index within a local matrix block
-                       int iColLowMat = 
UtilFunctions.computeCellInBlock(colixLow, _bclen);
-                       int iColHighMat = 
UtilFunctions.computeCellInBlock(colixHigh, _bclen);
+                       //frame index (row id, 1-based)
+                       Long rowix = new Long((ix.getRowIndex()-1)*_brlen+1);
 
-                       FrameBlock tmpBlock = 
DataConverter.convertToFrameBlock(matrixBlock);
+                       //global index within frame block (0-based)
+                       long cl = (int)((ix.getColumnIndex()-1)*_bclen);
+                       long cu = Math.min(cl+mb.getNumColumns()-1, _clen);
 
-                       int iRowLow = 0;        //Index within a local frame 
block
-                       while(iRowLow < matrixBlock.getNumRows()) {
-                               int iRowHigh = 
Math.min(iRowLow+_maxRowsPerBlock-1,  matrixBlock.getNumRows()-1);
-                               
-                               FrameBlock tmpBlock2 = null;
-                               //All rows from matrix block can fit into 
single frame block, no need for slicing 
-                               if(iRowLow == 0 && iRowHigh == 
matrixBlock.getNumRows()-1)
-                                       tmpBlock2 = tmpBlock;
-                               else
-                                       tmpBlock2 = 
tmpBlock.sliceOperations(iRowLow, iRowHigh, iColLowMat, iColHighMat, tmpBlock2);
-                               
-                               //If Matrix has only one column block, then 
simply assigns converted block to frame block
-                               if(colixLow == 0 && colixHigh == 
matrixBlock.getNumColumns()-1)
-                                       ret.add(new Tuple2<Long, 
FrameBlock>(rowix+iRowLow, tmpBlock2));
-                               else
-                               {
-                                       FrameBlock frameBlock = new 
FrameBlock((int)_clen, ValueType.STRING);
-                                       
frameBlock.ensureAllocatedColumns(iRowHigh-iRowLow+1);
-                                       
-                                       frameBlock.copy(0, iRowHigh-iRowLow, 
(int)colixLow-1, (int)colixHigh-1, tmpBlock2);
-                                       ret.add(new Tuple2<Long, 
FrameBlock>(rowix+iRowLow, frameBlock));
-                               }
-                               iRowLow = iRowHigh+1;
+                       //prepare output frame blocks 
+                       for( int i=0; i<mb.getNumRows(); i+=_maxRowsPerBlock ) {
+                               int ru = Math.min(i+_maxRowsPerBlock, 
mb.getNumRows())-1;
+                               FrameBlock fb = new FrameBlock((int)_clen, 
ValueType.DOUBLE);
+                               fb.ensureAllocatedColumns(ru-i+1);
+                               fb.copy(0, fb.getNumRows()-1, (int)cl, (int)cu, 
DataConverter.convertToFrameBlock(
+                                       mb.sliceOperations(i, ru, 0, 
mb.getNumColumns()-1, mbreuse)));
+                               ret.add(new Tuple2<Long, 
FrameBlock>(rowix+i,fb));                              
                        }
+
                        return ret;
                }
        }
 
-       /*
-        * This function supports if matrix has only one column block.
+       /**
+        * 
         */
        private static class MatrixToBinaryBlockOneColumnBlockFunction 
implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock>
        {
                private static final long serialVersionUID = 
3716019666116660815L;
 
                private int _brlen = -1;
-               private int _bclen = -1;
-               private long _clen = -1;
-       
-               
-               public 
MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc)
+                       
+               public 
MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc) 
+                       throws DMLRuntimeException
                {
+                       //sanity check function constraints
+                       if(mc.getCols() > mc.getColsPerBlock())
+                               throw new DMLRuntimeException("This function 
supports only matrices with a single column block.");
+
                        _brlen = mc.getRowsPerBlock();
-                       _bclen = mc.getColsPerBlock();
-                       _clen = mc.getCols();
                }
 
                @Override
                public Tuple2<Long, FrameBlock> 
call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
                        throws Exception 
                {
-                       if(_clen > _bclen)
-                               throw new DMLRuntimeException("The input matrix 
has more than one column block, this function supports only one column block.");
-
-                       MatrixIndexes matrixIndexes = arg0._1();
-                       MatrixBlock matrixBlock = arg0._2();
-                       
-                       //Frame Index (Row id, with base 1)
-                       Long rowix = new 
Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
-
-                       FrameBlock frameBlock = 
DataConverter.convertToFrameBlock(matrixBlock);
-                       return new Tuple2<Long, FrameBlock>(rowix, frameBlock);
+                       return new Tuple2<Long, FrameBlock>(
+                               (arg0._1().getRowIndex()-1)*_brlen+1, 
+                               DataConverter.convertToFrameBlock(arg0._2()) );
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 93bb1d0..dcdbd36 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -28,7 +28,6 @@ import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.spark.data.CorrMatrixBlock;
 import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
@@ -730,43 +729,4 @@ public class RDDAggregateUtils
                        return v1 + v2;
                }       
        }
-       
-       /**
-        * @param: in
-        * @return: 
-        */
-       public static JavaPairRDD<Long, FrameBlock> mergeByFrameKey( 
JavaPairRDD<Long, FrameBlock> in )
-       {
-               return in.reduceByKey(
-                               new MergeFrameBlocksFunction());
-       }
-       
-       /**
-        * 
-        */
-       private static class MergeFrameBlocksFunction implements 
Function2<FrameBlock, FrameBlock, FrameBlock> 
-       {               
-               private static final long serialVersionUID = 
-8881019027250258850L;
-
-               @Override
-               public FrameBlock call(FrameBlock b1, FrameBlock b2) 
-                       throws Exception 
-               {
-                       // sanity check input dimensions
-                       if (b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
-                               throw new DMLRuntimeException("Mismatched frame 
block sizes for: "
-                                               + b1.getNumRows() + " " + 
b1.getNumColumns() + " "
-                                               + b2.getNumRows() + " " + 
b2.getNumColumns());
-                       }
-
-                       // execute merge (never pass by reference)
-                       FrameBlock ret = new FrameBlock(b1);
-                       ret.merge(b2);
-                       
-                       return ret;
-               }
-
-       }
-       
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index cbd231e..005b254 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -960,11 +960,19 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        }
                
                //core frame block merge through cell copy
-               for( int i=0; i<that.getNumRows(); i++ ) {
-                       for( int j=0; j<getNumColumns(); j++ ) {
-                               Object obj = 
UtilFunctions.objectToObject(getSchema().get(j), that.get(i,j), true);
-                               if (obj != null)                        // Do 
not update with "null" data
-                                       set(i, j,obj);
+               //with column-wide access pattern
+               for( int j=0; j<getNumColumns(); j++ ) {
+                       //special case: copy non-zeros of column 
+                       if( _schema.get(j).equals(that._schema.get(j)) )
+                               _coldata.get(j).setNz(0, _numRows-1, 
that._coldata.get(j));
+                       //general case w/ schema transformation
+                       else {
+                               for( int i=0; i<_numRows; i++ ) {
+                                       Object obj = 
UtilFunctions.objectToObject(
+                                                       getSchema().get(j), 
that.get(i,j), true);
+                                       if (obj != null) //merge non-zeros
+                                               set(i, j,obj);
+                               }
                        }
                }
        }
@@ -1122,6 +1130,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                public abstract void set(int index, T value);
                public abstract void set(int rl, int ru, Array value);
                public abstract void set(int rl, int ru, Array value, int 
rlSrc);
+               public abstract void setNz(int rl, int ru, Array value);
                public abstract void append(String value);
                public abstract void append(T value);
                public abstract Array clone();
@@ -1150,6 +1159,12 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((StringArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
                }
+               public void setNz(int rl, int ru, Array value) {
+                       String[] data2 = ((StringArray)value)._data;
+                       for( int i=rl; i<ru+1; i++ )
+                               if( data2[i]!=null )
+                                       _data[i] = data2[i];
+               }
                public void append(String value) {
                        if( _data.length <= _size )
                                _data = Arrays.copyOf(_data, newSize());
@@ -1196,6 +1211,12 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((BooleanArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
                }
+               public void setNz(int rl, int ru, Array value) {
+                       boolean[] data2 = ((BooleanArray)value)._data;
+                       for( int i=rl; i<ru+1; i++ )
+                               if( data2[i] )
+                                       _data[i] = data2[i];
+               }
                public void append(String value) {
                        append(Boolean.parseBoolean(value));
                }
@@ -1243,6 +1264,12 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((LongArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
                }
+               public void setNz(int rl, int ru, Array value) {
+                       long[] data2 = ((LongArray)value)._data;
+                       for( int i=rl; i<ru+1; i++ )
+                               if( data2[i]!=0 )
+                                       _data[i] = data2[i];
+               }
                public void append(String value) {
                        append((value!=null)?Long.parseLong(value):null);
                }
@@ -1290,6 +1317,12 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                public void set(int rl, int ru, Array value, int rlSrc) {
                        System.arraycopy(((DoubleArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
                }
+               public void setNz(int rl, int ru, Array value) {
+                       double[] data2 = ((DoubleArray)value)._data;
+                       for( int i=rl; i<ru+1; i++ )
+                               if( data2[i]!=0 )
+                                       _data[i] = data2[i];
+               }
                public void append(String value) {
                        append((value!=null)?Double.parseDouble(value):null);
                }

Reply via email to