This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit cd7deec2b5d9532000780183462a0ef7eaeecf21
Author: baunsgaard <[email protected]>
AuthorDate: Mon Oct 31 20:09:46 2022 +0100

    [SYSTEMDS-3461] FrameBlocks Arrays Separation
    
    This commit removes the tight binding of the underlying
    arrays in frame block to its arrays for columns value types.
    There is one binding missing that is to pipe frames to python.
    This will be refactored in a following commit.
    
    This further fixes the indentation of one of the frame test files
    (tabs vs spaces) and moves the functional frame tests that was
    placed in component tests to frames.
---
 src/main/java/org/apache/sysds/common/Types.java   |   1 +
 .../sysds/runtime/frame/data/FrameBlock.java       | 749 +++------------------
 .../apache/sysds/runtime/frame/data/FrameUtil.java |  34 +
 .../sysds/runtime/frame/data/columns/Array.java    |  92 +++
 .../runtime/frame/data/columns/ArrayFactory.java   | 102 +++
 .../runtime/frame/data/columns/BooleanArray.java   | 127 ++++
 .../runtime/frame/data/columns/ColumnMetadata.java |  60 ++
 .../runtime/frame/data/columns/DoubleArray.java    | 127 ++++
 .../runtime/frame/data/columns/FloatArray.java     | 126 ++++
 .../runtime/frame/data/columns/IntegerArray.java   | 126 ++++
 .../runtime/frame/data/columns/LongArray.java      | 127 ++++
 .../runtime/frame/data/columns/StringArray.java    | 118 ++++
 ...ltiReturnParameterizedBuiltinSPInstruction.java |  11 +-
 .../test/component/frame/FrameCastingTest.java     |  32 +-
 .../test/component/frame/FrameIndexingTest.java    |  19 +-
 .../component/frame/FrameSerializationTest.java    |   4 +-
 .../test/functions/frame/DetectSchemaTest.java     |   2 +
 .../test/functions/frame/FrameAppendDistTest.java  |   2 +-
 .../test/functions/frame/FrameColumnNamesTest.java |   1 +
 .../test/functions/frame/FrameConstructorTest.java |   2 +-
 .../test/functions/frame/FrameConverterTest.java   |   2 +-
 .../frame/FrameDropInvalidLengthTest.java          |   1 +
 .../functions/frame/FrameDropInvalidTypeTest.java  |   1 +
 .../test/functions/frame/FrameFunctionTest.java    |   2 +-
 .../functions/frame/FrameIndexingDistTest.java     |   2 +-
 .../functions/frame/FrameMatrixCastingTest.java    |   2 +-
 .../functions/frame/FrameMatrixReblockTest.java    |   2 +-
 .../test/functions/frame/FrameMatrixWriteTest.java |  16 +-
 .../functions/frame/FrameMetaReadWriteTest.java    |   2 +-
 .../frame/FrameReadWriteTest.java                  |   4 +-
 .../frame/FrameRemoveEmptyTest.java                |   4 +-
 .../test/functions/frame/FrameReplaceTest.java     | 102 +--
 .../frame/FrameScalarCastingIntegratedTest.java    |   2 +-
 .../functions/frame/FrameScalarCastingTest.java    |   2 +-
 .../test/functions/frame/FrameSchemaReadTest.java  |   2 +-
 .../test/functions/frame/FrameValueSwapTest.java   |   6 +-
 .../frame/ParforFrameIntermediateTest.java         |   2 +-
 .../sysds/test/functions/frame/TypeOfTest.java     |   1 +
 38 files changed, 1261 insertions(+), 756 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index 4f613a40d7..991284c13e 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -77,6 +77,7 @@ public class Types
        public enum ValueType {
                UINT8, // Used for parsing in UINT values from numpy.
                FP32, FP64, INT32, INT64, BOOLEAN, STRING, UNKNOWN;
+               
                public boolean isNumeric() {
                        return this == UINT8 || this == INT32 || this == INT64 
|| this == FP32 || this == FP64;
                }
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index 536c1f8bee..57e38f4939 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -28,8 +28,6 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.lang.ref.SoftReference;
 import java.lang.reflect.InvocationTargetException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -53,13 +51,16 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
 import org.apache.sysds.api.DMLException;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata;
+import org.apache.sysds.runtime.frame.data.columns.StringArray;
 import org.apache.sysds.runtime.functionobjects.ValueComparisonFunction;
 import org.apache.sysds.runtime.instructions.cp.BooleanObject;
 import org.apache.sysds.runtime.instructions.cp.DoubleObject;
@@ -85,9 +86,10 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
        private static final Log LOG = 
LogFactory.getLog(FrameBlock.class.getName());
        private static final IDSequence CLASS_ID = new IDSequence();
 
-       public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, 
size of default matrix block
+       /** Buffer size variable: 1M elements, size of default matrix block */
+       public static final int BUFFER_SIZE = 1 * 1000 * 1000;
 
-       //internal configuration
+       /** internal configuration */
        private static final boolean REUSE_RECODE_MAPS = true;
 
        /** The number of rows of the FrameBlock */
@@ -328,47 +330,46 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void ensureAllocatedColumns(int numRows) {
                _msize = -1;
-               //early abort if already allocated
+               
+               // allocate column meta data if necessary
+               if( _colmeta == null || _schema.length != _colmeta.length ) {
+                       _colmeta = new ColumnMetadata[_schema.length];
+                       for( int j=0; j<_schema.length; j++ )
+                               _colmeta[j] = new ColumnMetadata(0);
+               }
+
+               // early abort if already allocated
                if( _coldata != null && _schema.length == _coldata.length ) {
                        //handle special case that to few rows allocated
                        if( _numRows < numRows ) {
                                String[] tmp = new String[getNumColumns()];
                                int len = numRows - _numRows;
+                               // TODO: Add append N function.
                                for(int i=0; i<len; i++)
                                        appendRow(tmp);
                        }
                        return;
                }
-               //allocate column meta data if necessary
-               if( _colmeta == null || _schema.length != _colmeta.length ) {
-                       _colmeta = new ColumnMetadata[_schema.length];
-                       for( int j=0; j<_schema.length; j++ )
-                               _colmeta[j] = new ColumnMetadata(0);
-               }
+
                //allocate columns if necessary
                _coldata = new Array[_schema.length];
-               for( int j=0; j<_schema.length; j++ ) {
-                       switch( _schema[j] ) {
-                               case STRING:  _coldata[j] = new StringArray(new 
String[numRows]); break;
-                               case BOOLEAN: _coldata[j] = new 
BooleanArray(new boolean[numRows]); break;
-                               case INT32:   _coldata[j] = new 
IntegerArray(new int[numRows]); break;
-                               case INT64:   _coldata[j] = new LongArray(new 
long[numRows]); break;
-                               case FP32:   _coldata[j] = new FloatArray(new 
float[numRows]); break;
-                               case FP64:   _coldata[j] = new DoubleArray(new 
double[numRows]); break;
-                               default: throw new 
RuntimeException("Unsupported value type: "+_schema[j]);
-                       }
-               }
+               for( int j=0; j<_schema.length; j++ )
+                       _coldata[j] = ArrayFactory.allocate(_schema[j], 
numRows);
+
                _numRows = numRows;
        }
 
        /**
         * Checks for matching column sizes in case of existing columns.
+        * 
+        * If the check parses the number of rows is reassigned to the given 
newLen
         *
-        * @param newlen number of rows to compare with existing number of rows
+        * @param newLen number of rows to compare with existing number of rows
         */
-       public void ensureColumnCompatibility(int newlen) {
-               if( _coldata!=null && _coldata.length > 0 && _numRows != newlen 
)
-                       throw new RuntimeException("Mismatch in number of rows: 
"+newlen+" (expected: "+_numRows+")");
+       public void ensureColumnCompatibility(int newLen) {
+               if( _coldata!=null && _coldata.length > 0 && _numRows != newLen 
)
+                       throw new RuntimeException("Mismatch in number of rows: 
"+newLen+" (expected: "+_numRows+")");
+               _numRows = newLen;
        }
 
        public static String[] createColNames(int size) {
@@ -386,6 +387,10 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                return "C" + i;
        }
 
+       private String createNextColName(){
+               return _schema != null ? createColName(_schema.length) : 
createColName(0);
+       }
+
        public boolean isColNamesDefault() {
                boolean ret = (_colnames != null);
                for( int j=0; j<getNumColumns() && ret; j++ )
@@ -491,13 +496,8 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(String[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, 
ValueType.STRING);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new StringArray(col)} 
:
-                       (Array[]) ArrayUtils.add(_coldata, new 
StringArray(col));
-               _numRows = col.length;
-               _msize = -1;
+               appendColumnMetaData(ValueType.STRING);
+               _coldata = FrameUtil.add(_coldata, ArrayFactory.create(col));
        }
 
        /**
@@ -509,13 +509,8 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(boolean[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, 
ValueType.BOOLEAN);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new 
BooleanArray(col)} :
-                       (Array[]) ArrayUtils.add(_coldata, new 
BooleanArray(col));
-               _numRows = col.length;
-               _msize = -1;
+               appendColumnMetaData(ValueType.BOOLEAN);
+               _coldata = FrameUtil.add(_coldata, ArrayFactory.create(col));
        }
 
        /**
@@ -527,14 +522,10 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(int[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, 
ValueType.INT32);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new 
IntegerArray(col)} :
-                       (Array[]) ArrayUtils.add(_coldata, new 
IntegerArray(col));
-               _numRows = col.length;
-               _msize = -1;
+               appendColumnMetaData(ValueType.INT32);
+               _coldata = FrameUtil.add(_coldata,  ArrayFactory.create(col));
        }
+
        /**
         * Append a column of value type LONG as the last column of
         * the data frame. The given array is wrapped but not copied
@@ -544,13 +535,8 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(long[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, 
ValueType.INT64);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new LongArray(col)} :
-                       (Array[]) ArrayUtils.add(_coldata, new LongArray(col));
-               _numRows = col.length;
-               _msize = -1;
+               appendColumnMetaData(ValueType.INT64);
+               _coldata = FrameUtil.add(_coldata, ArrayFactory.create(col));
        }
 
        /**
@@ -562,14 +548,10 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(float[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP32);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new FloatArray(col)} :
-                               (Array[]) ArrayUtils.add(_coldata, new 
FloatArray(col));
-               _numRows = col.length;
-               _msize = -1;
+               appendColumnMetaData(ValueType.FP32);
+               _coldata = FrameUtil.add(_coldata,  ArrayFactory.create(col));
        }
+
        /**
         * Append a column of value type DOUBLE as the last column of
         * the data frame. The given array is wrapped but not copied
@@ -579,12 +561,29 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         */
        public void appendColumn(double[] col) {
                ensureColumnCompatibility(col.length);
-               String[] colnames = getColumnNames(); //before schema 
modification
-               _schema = (ValueType[]) ArrayUtils.add(_schema, ValueType.FP64);
-               _colnames = (String[]) ArrayUtils.add(colnames, 
createColName(_schema.length));
-               _coldata = (_coldata==null) ? new Array[]{new DoubleArray(col)} 
:
-                       (Array[]) ArrayUtils.add(_coldata, new 
DoubleArray(col));
-               _numRows = col.length;
+               appendColumnMetaData(ValueType.FP64);
+               _coldata = FrameUtil.add(_coldata, ArrayFactory.create(col));
+       }
+
+       /**
+        * Append the metadata associated with adding a column.
+        * 
+        * @param vt The Value type
+        */
+       private void appendColumnMetaData(ValueType vt){
+               appendColumnMetaData(vt, createNextColName());
+       }
+
+       /**
+        * Append the metadata associated with adding a column.
+        * 
+        * @param vt The Value type
+        * @param colName The columnName
+        */
+       private void appendColumnMetaData(ValueType vt, String colName){
+               _schema = (ValueType[]) ArrayUtils.add(_schema, vt);
+               _colnames = (String[]) ArrayUtils.add(getColumnNames(), 
colName);
+               // Since we append a column we reset the _msize
                _msize = -1;
        }
 
@@ -601,7 +600,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                ValueType[] tmpSchema = UtilFunctions.nCopies(ncol, 
ValueType.FP64);
                Array[] tmpData = new Array[ncol];
                for( int j=0; j<ncol; j++ )
-                       tmpData[j] = new DoubleArray(cols[j]);
+                       tmpData[j] = ArrayFactory.create(cols[j]);
                _colnames = empty ? null : (String[]) 
ArrayUtils.addAll(getColumnNames(),
                        createColNames(getNumColumns(), ncol)); //before schema 
modification
                _schema = empty ? tmpSchema : (ValueType[]) 
ArrayUtils.addAll(_schema, tmpSchema);
@@ -610,41 +609,19 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                _msize = -1;
        }
 
-       public void appendColumn(ValueType vt, Array col) {
-               switch (vt) {
-                       case STRING:
-                               appendColumn(((StringArray) col).get());
-                               break;
-                       case BOOLEAN:
-                               appendColumn(((BooleanArray) col).get());
-                               break;
-                       case INT32:
-                               appendColumn(((IntegerArray) col).get());
-                               break;
-                       case INT64:
-                               appendColumn(((LongArray) col).get());
-                               break;
-                       case FP32:
-                               appendColumn(((FloatArray) col).get());
-                               break;
-                       case FP64:
-                               appendColumn(((DoubleArray) col).get());
-                               break;
-                       default:
-                               throw new RuntimeException("Unsupported value 
type: " + vt);
-               }
+       /**
+        * Add a column of already allocated Array type.
+        * 
+        * @param col column to add.
+        */
+       public void appendColumn(Array col) {
+               ensureColumnCompatibility(col.size());
+               appendColumnMetaData(col.getValueType());
+               _coldata = FrameUtil.add(_coldata, col);
        }
 
        public Object getColumnData(int c) {
-               switch(_schema[c]) {
-                       case STRING:  return ((StringArray)_coldata[c])._data;
-                       case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
-                       case INT64:   return ((LongArray)_coldata[c])._data;
-                       case INT32:   return ((IntegerArray)_coldata[c])._data;
-                       case FP64:    return ((DoubleArray)_coldata[c])._data;
-                       case FP32:    return ((FloatArray)_coldata[c])._data;
-                       default:      return null;
-               }
+               return _coldata[c].get();
        }
 
        public String getColumnType(int c){
@@ -664,8 +641,6 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
         * It should only be used in columns where the datatype is String.
         * Since in other cases it might be faster to return other types.
         *
-        * Note that P
-        *
         * @param c The column index.
         * @param r The row index.
         * @return The returned byte array.
@@ -673,7 +648,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
        public byte[] getIndexAsBytes(int c, int r){
                switch(_schema[c]){
                        case STRING:
-                               String[] col = ((StringArray)_coldata[c])._data;
+                               String[] col = ((StringArray)_coldata[c]).get();
                                if(col[r] != null)
                                        return col[r].getBytes();
                                else
@@ -683,48 +658,16 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                }
        }
 
-       public byte[] getColumnAsBytes(int c){
-               final int nRow = getNumRows();
-               switch(_schema[c]){
-                       case INT64:
-                               long[] colLong = ((LongArray)_coldata[c])._data;
-                               ByteBuffer longBuffer = ByteBuffer.allocate(8 * 
nRow);
-                               longBuffer.order(ByteOrder.LITTLE_ENDIAN);
-                               for(int i = 0; i <  nRow; i++)
-                                       longBuffer.putLong(colLong[i]);
-                               return longBuffer.array();
-                       case INT32:
-                               int[] colInt = 
((IntegerArray)_coldata[c])._data;
-                               ByteBuffer intBuffer = ByteBuffer.allocate(4 *  
nRow);
-                               intBuffer.order(ByteOrder.LITTLE_ENDIAN);
-                               for(int i = 0; i < nRow; i++)
-                                       intBuffer.putInt(colInt[i]);
-                               return intBuffer.array();
-                       case FP64:
-                               double[] colDouble = 
((DoubleArray)_coldata[c])._data;
-                               ByteBuffer doubleBuffer = ByteBuffer.allocate(8 
* nRow);
-                               doubleBuffer.order(ByteOrder.nativeOrder());
-                               for(int i = 0; i < nRow; i++)
-                                       doubleBuffer.putDouble(colDouble[i]);
-                               return doubleBuffer.array();
-                       case FP32:
-                               float[] colFloat = 
((FloatArray)_coldata[c])._data;
-                               ByteBuffer floatBuffer = ByteBuffer.allocate(8 
* nRow);
-                               floatBuffer.order(ByteOrder.nativeOrder());
-                               for(int i = 0; i < nRow; i++)
-                                       floatBuffer.putFloat(colFloat[i]);
-                               return floatBuffer.array();
-                       case BOOLEAN:
-                               boolean[] colBool = 
((BooleanArray)_coldata[c])._data;
-                               // over allocating here.. we could maybe bit 
pack?
-                               ByteBuffer booleanBuffer = 
ByteBuffer.allocate(nRow);
-                               booleanBuffer.order(ByteOrder.nativeOrder());
-                               for(int i = 0; i < nRow; i++)
-                                       booleanBuffer.put((byte)(colBool[i]? 
1:0));
-                               return booleanBuffer.array();
-                       default:
-                               throw new NotImplementedException();
-               }
+       /**
+        * Serialize the columns data as byte.
+        * 
+        * This serialization is used for transferring the frame to python.
+        * 
+        * @param c The column index
+        * @return The columns data as byte array.
+        */
+       public byte[] getColumnAsBytes(int c) {
+               return _coldata[c].getAsByteArray(getNumRows());
        }
 
        public Array getColumn(int c) {
@@ -922,19 +865,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                        String name = isDefaultMeta ? createColName(j) : 
in.readUTF();
                        long ndistinct = isDefaultMeta ? 0 : in.readLong();
                        String mvvalue = isDefaultMeta ? null : in.readUTF();
-                       Array arr = null;
-                       if( type > 0 ) { //non-empty column
-                               switch( vt ) {
-                                       case STRING:  arr = new StringArray(new 
String[_numRows]); break;
-                                       case BOOLEAN: arr = new 
BooleanArray(new boolean[_numRows]); break;
-                                       case INT64:     arr = new LongArray(new 
long[_numRows]); break;
-                                       case FP64:  arr = new DoubleArray(new 
double[_numRows]); break;
-                                       case INT32: arr = new IntegerArray(new 
int[_numRows]); break;
-                                       case FP32:  arr = new FloatArray(new 
float[_numRows]); break;
-                                       default: throw new 
IOException("Unsupported value type: "+vt);
-                               }
-                               arr.readFields(in);
-                       }
+                       Array arr = type > 0? ArrayFactory.read(in, vt, 
_numRows): null;
                        _schema[j] = vt;
                        _colnames[j] = name;
                        _colmeta[j] = new ColumnMetadata(ndistinct,
@@ -1424,7 +1355,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
        public HashMap<String,Long> getRecodeMap(int col) {
                //probe cache for existing map
                if( REUSE_RECODE_MAPS ) {
-                       SoftReference<HashMap<String,Long>> tmp = 
_coldata[col]._rcdMapCache;
+                       SoftReference<HashMap<String,Long>> tmp = 
_coldata[col].getCache();
                        HashMap<String,Long> map = (tmp!=null) ? tmp.get() : 
null;
                        if( map != null ) return map;
                }
@@ -1442,7 +1373,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
 
                //put created map into cache
                if( REUSE_RECODE_MAPS )
-                       _coldata[col]._rcdMapCache = new SoftReference<>(map);
+                       _coldata[col].setCache(new SoftReference<>(map));
 
                return map;
        }
@@ -1643,481 +1574,6 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                }
        }
 
-       ///////
-       // generic, resizable native arrays
-
-       /**
-        * Base class for generic, resizable array of various value types. We
-        * use this custom class hierarchy instead of Trove or other libraries
-        * in order to avoid unnecessary dependencies.
-        */
-       private abstract static class Array<T> implements Writable {
-               protected SoftReference<HashMap<String,Long>> _rcdMapCache = 
null;
-
-               protected int _size = 0;
-               protected int newSize() {
-                       return Math.max(_size*2, 4);
-               }
-               public abstract T get(int index);
-               public abstract void set(int index, T value);
-               public abstract void set(int rl, int ru, Array value);
-               public abstract void set(int rl, int ru, Array value, int 
rlSrc);
-               public abstract void setNz(int rl, int ru, Array value);
-               public abstract void append(String value);
-               public abstract void append(T value);
-               @Override
-               public abstract Array clone();
-               public abstract Array slice(int rl, int ru);
-               public abstract void reset(int size);
-
-               @Override
-               public String toString(){
-                       return this.getClass().getSimpleName().toString() + ":" 
+ _size;
-               }
-       }
-
-       private static class StringArray extends Array<String> {
-               private String[] _data = null;
-
-               public StringArray(String[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public String[] get() { return _data; }
-
-               @Override
-               public String get(int index) {
-                       return _data[index];
-               }
-
-               @Override
-               public void set(int index, String value) {
-                       _data[index] = value;
-               }
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl, ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((StringArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       String[] data2 = ((StringArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i]!=null )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = value;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeUTF((_data[i]!=null)?_data[i]:"");
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ ) {
-                               String tmp = in.readUTF();
-                               _data[i] = (!tmp.isEmpty()) ? tmp : null;
-                       }
-               }
-               @Override
-               public Array clone() {
-                       return new StringArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new 
StringArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new String[size];
-                       _size = size;
-               }
-       }
-
-       private static class BooleanArray extends Array<Boolean> {
-               private boolean[] _data = null;
-
-               public BooleanArray(boolean[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public boolean[] get() { return _data; }
-
-               @Override
-               public Boolean get(int index) {
-                       return _data[index];
-               }
-
-               @Override
-               public void set(int index, Boolean value) {
-                       _data[index] = (value!=null) ? value : false;
-               }
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl, ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((BooleanArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       boolean[] data2 = ((BooleanArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i] )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {
-                       append(Boolean.parseBoolean(value));
-               }
-               @Override
-               public void append(Boolean value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = (value!=null) ? value : false;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeBoolean(_data[i]);
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ )
-                               _data[i] = in.readBoolean();
-               }
-               @Override
-               public Array clone() {
-                       return new BooleanArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new 
BooleanArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new boolean[size];
-                       _size = size;
-               }
-       }
-
-       private static class LongArray extends Array<Long> {
-               private long[] _data = null;
-
-               public LongArray(long[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public long[] get() { return _data; }
-               @Override
-               public Long get(int index) {
-                       return _data[index];
-               }
-               @Override
-               public void set(int index, Long value) {
-                       _data[index] = (value!=null) ? value : 0L;
-               }
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl, ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((LongArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       long[] data2 = ((LongArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i]!=0 )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {
-                       append((value!=null)?Long.parseLong(value.trim()):null);
-               }
-               @Override
-               public void append(Long value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = (value!=null) ? value : 0L;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeLong(_data[i]);
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ )
-                               _data[i] = in.readLong();
-               }
-               @Override
-               public Array clone() {
-                       return new LongArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new LongArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new long[size];
-                       _size = size;
-               }
-       }
-
-       private static class IntegerArray extends Array<Integer> {
-               private int[] _data = null;
-
-               public IntegerArray(int[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public int[] get() { return _data; }
-
-               @Override
-               public Integer get(int index) {
-                       return _data[index];
-               }
-
-               @Override
-               public void set(int index, Integer value) { _data[index] = 
(value!=null) ? value : 0;}
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl, ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((IntegerArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       int[] data2 = ((IntegerArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i]!=0 )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {
-                       
append((value!=null)?Integer.parseInt(value.trim()):null);
-               }
-               @Override
-               public void append(Integer value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = (value!=null) ? value : 0;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeLong(_data[i]);
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ )
-                               _data[i] = in.readInt();
-               }
-               @Override
-               public Array clone() {
-                       return new IntegerArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new 
IntegerArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new int[size];
-                       _size = size;
-               }
-       }
-
-       private static class FloatArray extends Array<Float> {
-               private float[] _data = null;
-
-               public FloatArray(float[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public float[] get() { return _data; }
-
-               @Override
-               public Float get(int index) {
-                       return _data[index];
-               }
-
-               @Override
-               public void set(int index, Float value) { _data[index] = 
(value!=null) ? value : 0f; }
-
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl,ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((FloatArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       float[] data2 = ((FloatArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i]!=0 )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {      append((value!=null)? 
Float.parseFloat(value):null); }
-
-               @Override
-               public void append(Float value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = (value!=null) ? value : 0f;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeFloat(_data[i]);
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ )
-                               _data[i] = in.readFloat();
-               }
-               @Override
-               public Array clone() {
-                       return new FloatArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new 
FloatArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new float[size];
-                       _size = size;
-               }
-       }
-
-       private static class DoubleArray extends Array<Double> {
-               private double[] _data = null;
-
-               public DoubleArray(double[] data) {
-                       _data = data;
-                       _size = _data.length;
-               }
-               public double[] get() { return _data; }
-               @Override
-               public Double get(int index) {
-                       return _data[index];
-               }
-               @Override
-               public void set(int index, Double value) {
-                       _data[index] = (value!=null) ? value : 0d;
-               }
-               @Override
-               public void set(int rl, int ru, Array value) {
-                       set(rl,ru, value, 0);
-               }
-               @Override
-               public void set(int rl, int ru, Array value, int rlSrc) {
-                       System.arraycopy(((DoubleArray)value)._data, rlSrc, 
_data, rl, ru-rl+1);
-               }
-               @Override
-               public void setNz(int rl, int ru, Array value) {
-                       double[] data2 = ((DoubleArray)value)._data;
-                       for( int i=rl; i<ru+1; i++ )
-                               if( data2[i]!=0 )
-                                       _data[i] = data2[i];
-               }
-               @Override
-               public void append(String value) {
-                       append((value!=null)?Double.parseDouble(value):null);
-               }
-               @Override
-               public void append(Double value) {
-                       if( _data.length <= _size )
-                               _data = Arrays.copyOf(_data, newSize());
-                       _data[_size++] = (value!=null) ? value : 0d;
-               }
-               @Override
-               public void write(DataOutput out) throws IOException {
-                       for( int i=0; i<_size; i++ )
-                               out.writeDouble(_data[i]);
-               }
-               @Override
-               public void readFields(DataInput in) throws IOException {
-                       _size = _data.length;
-                       for( int i=0; i<_size; i++ )
-                               _data[i] = in.readDouble();
-               }
-               @Override
-               public Array clone() {
-                       return new DoubleArray(Arrays.copyOf(_data, _size));
-               }
-               @Override
-               public Array slice(int rl, int ru) {
-                       return new 
DoubleArray(Arrays.copyOfRange(_data,rl,ru+1));
-               }
-               @Override
-               public void reset(int size) {
-                       if( _data.length < size )
-                               _data = new double[size];
-                       _size = size;
-               }
-       }
-
-       public static class ColumnMetadata implements Serializable {
-               private static final long serialVersionUID = 
-90094082422100311L;
-
-               private long _ndistinct = 0;
-               private String _mvValue = null;
-
-               public ColumnMetadata(long ndistinct) {
-                       _ndistinct = ndistinct;
-               }
-               public ColumnMetadata(long ndistinct, String mvval) {
-                       _ndistinct = ndistinct;
-                       _mvValue = mvval;
-               }
-               public ColumnMetadata(ColumnMetadata that) {
-                       _ndistinct = that._ndistinct;
-                       _mvValue = that._mvValue;
-               }
-
-               public long getNumDistinct() {
-                       return _ndistinct;
-               }
-               public void setNumDistinct(long ndistinct) {
-                       _ndistinct = ndistinct;
-               }
-               public String getMvValue() {
-                       return _mvValue;
-               }
-               public void setMvValue(String mvVal) {
-                       _mvValue = mvVal;
-               }
-       }
-
        private static ValueType isType(String val) {
                val = val.trim().toLowerCase().replaceAll("\"",  "");
                if (val.matches("(true|false|t|f|0|1)"))
@@ -2150,7 +1606,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                ExecutorService pool = CommonThreadPool.get(cols);
                ArrayList<DetectValueTypeTask> tasks = new ArrayList<>();
                for (int i = 0; i < cols; i++) {
-                       FrameBlock.Array obj = this.getColumn(i);
+                       Array obj = this.getColumn(i);
                        tasks.add(new DetectValueTypeTask(obj,rows, sample));
                }
 
@@ -2282,7 +1738,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                                continue;
                        int validLength = (int)feaLen.quickGetValue(0, i);
                        Array obj = this.getColumn(i);
-                       for (int j = 0; j < obj._size; j++)
+                       for (int j = 0; j < obj.size(); j++)
                        {
                                if(obj.get(j) == null)
                                        continue;
@@ -2363,6 +1819,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                                out.set(i, j, rowToreplicate.get(0, j));
                return out;
        }
+
        public FrameBlock valueSwap(FrameBlock schema) {
                String[] schemaString = schema.getStringRowIterator().next();
                String dataValue2 = null;
@@ -2376,7 +1833,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                double[] maxColLength = new double[this.getNumColumns()];
 
                for(int k = 0; k < this.getNumColumns(); k++) {
-                       String[] data = ((StringArray) this.getColumn(k))._data;
+                       String[] data = ((StringArray) this.getColumn(k)).get();
 
                        double minLength = 
Arrays.stream(data).filter(Objects::nonNull).mapToDouble(String::length).min().orElse(Double.NaN);
                        double maxLength = 
Arrays.stream(data).filter(Objects::nonNull).mapToDouble(String::length).max().orElse(Double.NaN);
@@ -2471,7 +1928,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                        // Execute map function on all cells
                        for(int j = 0; j < getNumColumns(); j++) {
                                Array input = getColumn(j);
-                               for(int i = 0; i < input._size; i++)
+                               for(int i = 0; i < input.size(); i++)
                                        if(input.get(i) != null)
                                                output[i][j] = 
lambdaExpr.apply(String.valueOf(input.get(i)));
                        }
@@ -2484,8 +1941,8 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                for(String[] row : output)
                        Arrays.fill(row, "0.0");
                Array input = getColumn(0);
-               for(int j = 0; j < input._size - 1; j++) {
-                       for(int i = j + 1; i < input._size; i++)
+               for(int j = 0; j < input.size() - 1; j++) {
+                       for(int i = j + 1; i < input.size(); i++)
                                if(input.get(i) != null && input.get(j) != 
null) {
                                        output[j][i] = 
lambdaExpr.apply(String.valueOf(input.get(j)), String.valueOf(input.get(i)));
                                }
@@ -2555,7 +2012,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
 
                for(int i = 0; i < ret.getNumColumns(); i++){
                        Array colData = ret._coldata[i];
-                       for(int j = 0; j < colData._size && 
(ValueType.isSameTypeString(_schema[i], patternType) || _schema[i] == 
ValueType.STRING); j++) {
+                       for(int j = 0; j < colData.size() && 
(ValueType.isSameTypeString(_schema[i], patternType) || _schema[i] == 
ValueType.STRING); j++) {
                                T patternNew =  (T) 
UtilFunctions.stringToObject(_schema[i], pattern);
                                T replacementNew = (T) 
UtilFunctions.stringToObject(_schema[i], replacement);
 
@@ -2632,12 +2089,12 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                        for(int i = 0; i < getNumColumns(); i++) {
                                Array colData = _coldata[i];
                                ValueType type = _schema[i];
-                               boolean isEmpty = IntStream.range(0, 
colData._size)
+                               boolean isEmpty = IntStream.range(0, 
colData.size())
                                        .mapToObj((IntFunction<Object>) 
colData::get)
                                        .allMatch(e -> ArrayUtils.contains(new 
double[]{0.0, Double.NaN}, UtilFunctions.objectToDoubleSafe(type, e)));
 
                                if(!isEmpty) {
-                                       ret.appendColumn(_schema[i], 
_coldata[i]);
+                                       ret.appendColumn(_coldata[i]);
                                        columnMetadata.add(new 
ColumnMetadata(_colmeta[i]));
                                }
                        }
@@ -2648,7 +2105,7 @@ public class FrameBlock implements CacheBlock, 
Externalizable {
                        int[] indices = 
DataConverter.convertVectorToIndexList(select);
                        int k = 0;
                        for(int i : indices) {
-                               ret.appendColumn(_schema[i], _coldata[i]);
+                               ret.appendColumn(_coldata[i]);
                                columnMetadata.add(new 
ColumnMetadata(_colmeta[i]));
                                if(_colnames != null)
                                        ret._colnames[k++] = _colnames[i];
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameUtil.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameUtil.java
new file mode 100644
index 0000000000..f181e86b86
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data;
+
+import org.apache.sysds.runtime.frame.data.columns.Array;
+
+@SuppressWarnings({"rawtypes"})
+public interface FrameUtil {
+       public static Array[] add(Array[] ar, Array e) {
+               if(ar == null)
+                       return new Array[] {e};
+               Array[] ret = new Array[ar.length + 1];
+               System.arraycopy(ar, 0, ret, 0, ar.length);
+               ret[ar.length] = e;
+               return ret;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
new file mode 100644
index 0000000000..1c2c6973dc
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.sysds.common.Types.ValueType;
+
+/**
+ * generic, resizable native arrays
+ * 
+ * Base class for generic, resizable array of various value types. We use this 
custom class hierarchy instead of Trove
+ * or other libraries in order to avoid unnecessary dependencies.
+ */
+public abstract class Array<T> implements Writable {
+       protected SoftReference<HashMap<String, Long>> _rcdMapCache = null;
+
+       protected int _size = 0;
+
+       protected int newSize() {
+               return Math.max(_size * 2, 4);
+       }
+
+       public abstract T get(int index);
+
+       /**
+        * Get the underlying array out of the column Group, it is the 
responsibility of the caller to know what type it is
+        * 
+        * @return The underlying array.
+        */
+       public abstract Object get();
+
+       public final SoftReference<HashMap<String, Long>> getCache(){
+               return _rcdMapCache;
+       }
+
+       public final void setCache(SoftReference<HashMap<String, Long>> m){
+               _rcdMapCache = m;
+       }
+
+       public final int size(){
+               return _size;
+       }
+
+       public abstract void set(int index, T value);
+
+       public abstract void set(int rl, int ru, Array<T> value);
+
+       public abstract void set(int rl, int ru, Array<T> value, int rlSrc);
+
+       public abstract void setNz(int rl, int ru, Array<T> value);
+
+       public abstract void append(String value);
+
+       public abstract void append(T value);
+
+       @Override
+       public abstract Array<T> clone();
+
+       public abstract Array<T> slice(int rl, int ru);
+
+       public abstract void reset(int size);
+
+       public abstract byte[] getAsByteArray(int nRow);
+
+       public abstract ValueType getValueType();
+       
+       @Override
+       public String toString() {
+               return this.getClass().getSimpleName().toString() + ":" + _size;
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
new file mode 100644
index 0000000000..db2979eca9
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+
+public interface ArrayFactory {
+
+       public static StringArray create(String[] col) {
+               return new StringArray(col);
+       }
+
+       public static BooleanArray create(boolean[] col) {
+               return new BooleanArray(col);
+       }
+
+       public static IntegerArray create(int[] col) {
+               return new IntegerArray(col);
+       }
+
+       public static LongArray create(long[] col) {
+               return new LongArray(col);
+       }
+
+       public static FloatArray create(float[] col) {
+               return new FloatArray(col);
+       }
+
+       public static DoubleArray create(double[] col) {
+               return new DoubleArray(col);
+       }
+
+       @SuppressWarnings({"rawtypes"})
+       public static Array allocate(ValueType v, int nRow) {
+               switch(v) {
+                       case STRING:
+                               return new StringArray(new String[nRow]);
+                       case BOOLEAN:
+                               return new BooleanArray(new boolean[nRow]);
+                       case INT32:
+                               return new IntegerArray(new int[nRow]);
+                       case INT64:
+                               return new LongArray(new long[nRow]);
+                       case FP32:
+                               return new FloatArray(new float[nRow]);
+                       case FP64:
+                               return new DoubleArray(new double[nRow]);
+                       default:
+                               throw new DMLRuntimeException("Unsupported 
value type: " + v);
+               }
+       }
+
+       @SuppressWarnings({"rawtypes"})
+       public static Array read(DataInput in, ValueType v, int nRow) throws 
IOException {
+               Array arr;
+               switch(v) {
+                       case STRING:
+                               arr = new StringArray(new String[nRow]);
+                               break;
+                       case BOOLEAN:
+                               arr = new BooleanArray(new boolean[nRow]);
+                               break;
+                       case INT64:
+                               arr = new LongArray(new long[nRow]);
+                               break;
+                       case FP64:
+                               arr = new DoubleArray(new double[nRow]);
+                               break;
+                       case INT32:
+                               arr = new IntegerArray(new int[nRow]);
+                               break;
+                       case FP32:
+                               arr = new FloatArray(new float[nRow]);
+                               break;
+                       default:
+                               throw new IOException("Unsupported value type: 
" + v);
+               }
+               arr.readFields(in);
+               return arr;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
new file mode 100644
index 0000000000..0af22b3d01
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.sysds.common.Types.ValueType;
+
+public class BooleanArray extends Array<Boolean> {
+       private boolean[] _data = null;
+
+       public BooleanArray(boolean[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public boolean[] get() {
+               return _data;
+       }
+
+       @Override
+       public Boolean get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, Boolean value) {
+               _data[index] = (value != null) ? value : false;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Boolean> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Boolean> value, int rlSrc) {
+               System.arraycopy(((BooleanArray) value)._data, rlSrc, _data, 
rl, ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<Boolean> value) {
+               boolean[] data2 = ((BooleanArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i])
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               append(Boolean.parseBoolean(value));
+       }
+
+       @Override
+       public void append(Boolean value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = (value != null) ? value : false;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeBoolean(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++)
+                       _data[i] = in.readBoolean();
+       }
+
+       @Override
+       public Array<Boolean> clone() {
+               return new BooleanArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<Boolean> slice(int rl, int ru) {
+               return new BooleanArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new boolean[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               // over allocating here.. we could maybe bit pack?
+               ByteBuffer booleanBuffer = ByteBuffer.allocate(nRow);
+               booleanBuffer.order(ByteOrder.nativeOrder());
+               for(int i = 0; i < nRow; i++)
+                       booleanBuffer.put((byte) (_data[i] ? 1 : 0));
+               return booleanBuffer.array();
+       }
+
+       @Override 
+       public ValueType getValueType(){
+               return ValueType.BOOLEAN;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ColumnMetadata.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ColumnMetadata.java
new file mode 100644
index 0000000000..c86fcd9a7b
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ColumnMetadata.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.Serializable;
+
+public class ColumnMetadata implements Serializable {
+       private static final long serialVersionUID = -90094082422100311L;
+
+       private long _ndistinct = 0;
+       private String _mvValue = null;
+
+       public ColumnMetadata(long ndistinct) {
+                       _ndistinct = ndistinct;
+               }
+
+       public ColumnMetadata(long ndistinct, String mvval) {
+                       _ndistinct = ndistinct;
+                       _mvValue = mvval;
+               }
+
+       public ColumnMetadata(ColumnMetadata that) {
+                       _ndistinct = that._ndistinct;
+                       _mvValue = that._mvValue;
+               }
+
+       public long getNumDistinct() {
+               return _ndistinct;
+       }
+
+       public void setNumDistinct(long ndistinct) {
+               _ndistinct = ndistinct;
+       }
+
+       public String getMvValue() {
+               return _mvValue;
+       }
+
+       public void setMvValue(String mvVal) {
+               _mvValue = mvVal;
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java
new file mode 100644
index 0000000000..61244c218b
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.sysds.common.Types.ValueType;
+
+
+public class DoubleArray extends Array<Double> {
+       private double[] _data = null;
+
+       public DoubleArray(double[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public double[] get() {
+               return _data;
+       }
+
+       @Override
+       public Double get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, Double value) {
+               _data[index] = (value != null) ? value : 0d;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Double> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Double> value, int rlSrc) {
+               System.arraycopy(((DoubleArray) value)._data, rlSrc, _data, rl, 
ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<Double> value) {
+               double[] data2 = ((DoubleArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i] != 0)
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               append((value != null) ? Double.parseDouble(value) : null);
+       }
+
+       @Override
+       public void append(Double value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = (value != null) ? value : 0d;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeDouble(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++)
+                       _data[i] = in.readDouble();
+       }
+
+       @Override
+       public Array<Double> clone() {
+               return new DoubleArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<Double> slice(int rl, int ru) {
+               return new DoubleArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new double[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               ByteBuffer doubleBuffer = ByteBuffer.allocate(8 * nRow);
+               doubleBuffer.order(ByteOrder.nativeOrder());
+               for(int i = 0; i < nRow; i++)
+                       doubleBuffer.putDouble(_data[i]);
+               return doubleBuffer.array();
+       }
+
+       @Override
+       public ValueType getValueType() {
+               return ValueType.FP64;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
new file mode 100644
index 0000000000..b0f75009ff
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.sysds.common.Types.ValueType;
+
+public class FloatArray extends Array<Float> {
+       private float[] _data = null;
+
+       public FloatArray(float[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public float[] get() {
+               return _data;
+       }
+
+       @Override
+       public Float get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, Float value) {
+               _data[index] = (value != null) ? value : 0f;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Float> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Float> value, int rlSrc) {
+               System.arraycopy(((FloatArray) value)._data, rlSrc, _data, rl, 
ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<Float> value) {
+               float[] data2 = ((FloatArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i] != 0)
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               append((value != null) ? Float.parseFloat(value) : null);
+       }
+
+       @Override
+       public void append(Float value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = (value != null) ? value : 0f;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeFloat(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++)
+                       _data[i] = in.readFloat();
+       }
+
+       @Override
+       public Array<Float> clone() {
+               return new FloatArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<Float> slice(int rl, int ru) {
+               return new FloatArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new float[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               ByteBuffer floatBuffer = ByteBuffer.allocate(8 * nRow);
+               floatBuffer.order(ByteOrder.nativeOrder());
+               for(int i = 0; i < nRow; i++)
+                       floatBuffer.putFloat(_data[i]);
+               return floatBuffer.array();
+       }
+
+       @Override
+       public ValueType getValueType() {
+               return ValueType.FP32;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java
new file mode 100644
index 0000000000..046ca5c57a
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.sysds.common.Types.ValueType;
+
+public class IntegerArray extends Array<Integer> {
+       private int[] _data = null;
+
+       public IntegerArray(int[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public int[] get() {
+               return _data;
+       }
+
+       @Override
+       public Integer get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, Integer value) {
+               _data[index] = (value != null) ? value : 0;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Integer> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Integer> value, int rlSrc) {
+               System.arraycopy(((IntegerArray) value)._data, rlSrc, _data, 
rl, ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<Integer> value) {
+               int[] data2 = ((IntegerArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i] != 0)
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               append((value != null) ? Integer.parseInt(value.trim()) : null);
+       }
+
+       @Override
+       public void append(Integer value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = (value != null) ? value : 0;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeLong(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++)
+                       _data[i] = in.readInt();
+       }
+
+       @Override
+       public Array<Integer> clone() {
+               return new IntegerArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<Integer> slice(int rl, int ru) {
+               return new IntegerArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new int[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               ByteBuffer intBuffer = ByteBuffer.allocate(4 * nRow);
+               intBuffer.order(ByteOrder.LITTLE_ENDIAN);
+               for(int i = 0; i < nRow; i++)
+                       intBuffer.putInt(_data[i]);
+               return intBuffer.array();
+       }
+
+       @Override 
+       public ValueType getValueType(){
+               return ValueType.INT32;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java
new file mode 100644
index 0000000000..ea644b931e
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.sysds.common.Types.ValueType;
+
+public class LongArray extends Array<Long> {
+       private long[] _data = null;
+
+       public LongArray(long[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public long[] get() {
+               return _data;
+       }
+
+       @Override
+       public Long get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, Long value) {
+               _data[index] = (value != null) ? value : 0L;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Long> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<Long> value, int rlSrc) {
+               System.arraycopy(((LongArray) value)._data, rlSrc, _data, rl, 
ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<Long> value) {
+               long[] data2 = ((LongArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i] != 0)
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               append((value != null) ? Long.parseLong(value.trim()) : null);
+       }
+
+       @Override
+       public void append(Long value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = (value != null) ? value : 0L;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeLong(_data[i]);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++)
+                       _data[i] = in.readLong();
+       }
+
+       @Override
+       public Array<Long> clone() {
+               return new LongArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<Long> slice(int rl, int ru) {
+               return new LongArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new long[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               ByteBuffer longBuffer = ByteBuffer.allocate(8 * nRow);
+               longBuffer.order(ByteOrder.LITTLE_ENDIAN);
+               for(int i = 0; i < nRow; i++)
+                       longBuffer.putLong(_data[i]);
+               return longBuffer.array();
+       }
+
+       @Override 
+       public ValueType getValueType(){
+               return ValueType.INT64;
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
new file mode 100644
index 0000000000..6418fe94e3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.common.Types.ValueType;
+
+public class StringArray extends Array<String> {
+       private String[] _data = null;
+
+       public StringArray(String[] data) {
+               _data = data;
+               _size = _data.length;
+       }
+
+       public String[] get() {
+               return _data;
+       }
+
+       @Override
+       public String get(int index) {
+               return _data[index];
+       }
+
+       @Override
+       public void set(int index, String value) {
+               _data[index] = value;
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<String> value) {
+               set(rl, ru, value, 0);
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<String> value, int rlSrc) {
+               System.arraycopy(((StringArray) value)._data, rlSrc, _data, rl, 
ru - rl + 1);
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<String> value) {
+               String[] data2 = ((StringArray) value)._data;
+               for(int i = rl; i < ru + 1; i++)
+                       if(data2[i] != null)
+                               _data[i] = data2[i];
+       }
+
+       @Override
+       public void append(String value) {
+               if(_data.length <= _size)
+                       _data = Arrays.copyOf(_data, newSize());
+               _data[_size++] = value;
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               for(int i = 0; i < _size; i++)
+                       out.writeUTF((_data[i] != null) ? _data[i] : "");
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               _size = _data.length;
+               for(int i = 0; i < _size; i++) {
+                       String tmp = in.readUTF();
+                       _data[i] = (!tmp.isEmpty()) ? tmp : null;
+               }
+       }
+
+       @Override
+       public Array<String> clone() {
+               return new StringArray(Arrays.copyOf(_data, _size));
+       }
+
+       @Override
+       public Array<String> slice(int rl, int ru) {
+               return new StringArray(Arrays.copyOfRange(_data, rl, ru + 1));
+       }
+
+       @Override
+       public void reset(int size) {
+               if(_data.length < size)
+                       _data = new String[size];
+               _size = size;
+       }
+
+       @Override
+       public byte[] getAsByteArray(int nRow) {
+               throw new NotImplementedException("Not Implemented getAsByte 
for string");
+       }
+
+       @Override
+       public ValueType getValueType() {
+               return ValueType.STRING;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 42a28f83f1..7a1a7c5354 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -45,7 +45,7 @@ import 
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
-import org.apache.sysds.runtime.frame.data.FrameBlock.ColumnMetadata;
+import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
@@ -61,8 +61,15 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.transform.TfUtils;
-import org.apache.sysds.runtime.transform.encode.*;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoder;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoderComposite;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode;
+import org.apache.sysds.runtime.transform.encode.Encoder;
+import org.apache.sysds.runtime.transform.encode.EncoderFactory;
+import org.apache.sysds.runtime.transform.encode.EncoderMVImpute;
 import org.apache.sysds.runtime.transform.encode.EncoderMVImpute.MVMethod;
+import org.apache.sysds.runtime.transform.encode.MultiColumnEncoder;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.transform.meta.TfOffsetMap;
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java 
b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
index d87e652278..8914cb8c14 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
@@ -122,21 +122,25 @@ public class FrameCastingTest extends AutomatedTestBase
                                frame = DataConverter.convertToFrameBlock(mb, 
schema);  
                        }
                        
-                       //check basic meta data
-                       if( frame.getNumRows() != rows )
-                               Assert.fail("Wrong number of rows: 
"+frame.getNumRows()+", expected: "+rows);
-               
-                       //check correct values
-                       ValueType[] lschema = frame.getSchema();
-                       for( int i=0; i<rows; i++ ) 
-                               for( int j=0; j<lschema.length; j++ )   {
-                                       double tmp = 
UtilFunctions.objectToDouble(lschema[j], frame.get(i, j));
-                                       double tmpm = Double.isNaN(A[i][j]) ? 
0.0: A[i][j];
-                                       tmp = Double.isNaN(tmp) ? 0.0 : tmp;
+                       if(frame != null){
 
-                                       if( tmp != tmpm)
-                                               Assert.fail("Wrong get value 
for cell ("+i+","+j+"): "+tmp+", expected: "+A[i][j]);
-                               }               
+                               //check basic meta data
+                               if( frame.getNumRows() != rows )
+                                       Assert.fail("Wrong number of rows: 
"+frame.getNumRows()+", expected: "+rows);
+                       
+                               //check correct values
+                               ValueType[] lschema = frame.getSchema();
+                               for( int i=0; i<rows; i++ ) {
+                                       for( int j=0; j<lschema.length; j++ )   
{
+                                               double tmp = 
UtilFunctions.objectToDouble(lschema[j], frame.get(i, j));
+                                               double tmpm = 
Double.isNaN(A[i][j]) ? 0.0: A[i][j];
+                                               tmp = Double.isNaN(tmp) ? 0.0 : 
tmp;
+       
+                                               if( tmp != tmpm)
+                                                       Assert.fail("Wrong get 
value for cell ("+i+","+j+"): "+tmp+", expected: "+A[i][j]);
+                                       }               
+                               }
+                       }
                }
                catch(Exception ex) {
                        ex.printStackTrace();
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameIndexingTest.java 
b/src/test/java/org/apache/sysds/test/component/frame/FrameIndexingTest.java
index 7b4a898725..ff09c5055b 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameIndexingTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameIndexingTest.java
@@ -136,17 +136,20 @@ public class FrameIndexingTest extends AutomatedTestBase
                        }
                        
                        //check basic meta data
-                       if( frame3.getNumRows() != mbC.getNumRows() )
+                       if(frame3 != null && mbC != null &&  
frame3.getNumRows() != mbC.getNumRows() )
                                Assert.fail("Wrong number of rows: 
"+frame3.getNumRows()+", expected: "+mbC.getNumRows());
                
                        //check correct values
-                       ValueType[] lschema = frame3.getSchema();
-                       for( int i=0; i<ru-rl+1; i++ ) 
-                               for( int j=0; j<lschema.length; j++ )   {
-                                       double tmp = 
UtilFunctions.objectToDouble(lschema[j], frame3.get(i, j));
-                                       if( tmp != mbC.quickGetValue(i, j) )
-                                               Assert.fail("Wrong get value 
for cell ("+i+","+j+"): "+tmp+", expected: "+mbC.quickGetValue(i, j));
-                               }               
+                       if(frame3 != null && mbC != null){
+                               ValueType[] lschema = frame3.getSchema();
+                               for( int i=0; i<ru-rl+1; i++ ) {
+                                       for( int j=0; j<lschema.length; j++ )   
{
+                                               double tmp = 
UtilFunctions.objectToDouble(lschema[j], frame3.get(i, j));
+                                               if( tmp != mbC.quickGetValue(i, 
j) )
+                                                       Assert.fail("Wrong get 
value for cell ("+i+","+j+"): "+tmp+", expected: "+mbC.quickGetValue(i, j));
+                                       }               
+                               }
+                       }
                }
                catch(Exception ex) {
                        ex.printStackTrace();
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
index 44e8669720..5c2843b792 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
@@ -102,7 +102,7 @@ public class FrameSerializationTest extends 
AutomatedTestBase
                                getRandomMatrix(rows, schema.length, -10, 10, 
0.9, 8234);
                        
                        //init data frame 
-                       if( !empty ) {
+                       if( !empty && A != null ) {
                                Object[] row = new Object[schema.length];
                                for( int i=0; i<rows; i++ ) {
                                        for( int j=0; j<schema.length; j++ )
@@ -143,7 +143,7 @@ public class FrameSerializationTest extends 
AutomatedTestBase
                                Assert.fail("Wrong number of rows: 
"+frame.getNumRows()+", expected: "+numExpected);
                
                        //check correct values
-                       if( !empty ) {
+                       if( !empty && A != null ) {
                                for( int i=0; i<rows; i++ ) 
                                        for( int j=0; j<schema.length; j++ ) {
                                                double tmp = 
UtilFunctions.objectToDouble(schema[j], frame.get(i, j));
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/DetectSchemaTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/DetectSchemaTest.java
index 98d308db93..c7d4c1acbd 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/DetectSchemaTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/DetectSchemaTest.java
@@ -102,7 +102,9 @@ public class DetectSchemaTest extends AutomatedTestBase {
                Types.ExecMode platformOld = setExecMode(et);
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               setOutputBuffering(true);
                try {
+                       
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
index 7a4b09580f..f1b3ac34f7 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
@@ -119,7 +119,7 @@ public class FrameAppendDistTest extends AutomatedTestBase
                
                double sparsity = (sparse) ? sparsity2 : sparsity1; 
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-               
+               setOutputBuffering(true);
                try
                {
                        if(forcedAppendMethod != null) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameColumnNamesTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameColumnNamesTest.java
index 5762ed7e6c..d962d809aa 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameColumnNamesTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameColumnNamesTest.java
@@ -75,6 +75,7 @@ public class FrameColumnNamesTest extends AutomatedTestBase {
        private void runGetColNamesTest(String[] columnNames, ExecType et) {
                Types.ExecMode platformOld = setExecMode(et);
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               setOutputBuffering(true);
                try {
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
index 57d39f0ff9..039692c9e5 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
@@ -143,7 +143,7 @@ public class FrameConstructorTest extends AutomatedTestBase 
{
                Types.ExecMode platformOld = setExecMode(et);
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-
+               setOutputBuffering(true);
                try {
                        //setup testcase
                        getAndLoadTestConfiguration(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameConverterTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameConverterTest.java
index b933a7bd87..5dcf6810f1 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameConverterTest.java
@@ -200,7 +200,7 @@ public class FrameConverterTest extends AutomatedTestBase
                DMLScript.setGlobalExecMode(ExecMode.SPARK);
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
+               setOutputBuffering(true);
                try
                {
                        TestConfiguration config = 
getTestConfiguration(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
index a161449c19..ca74482fad 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
@@ -119,6 +119,7 @@ public class FrameDropInvalidLengthTest extends 
AutomatedTestBase {
                Types.ExecMode platformOld = setExecMode(et);
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               setOutputBuffering(true);
                try {
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
index 3f867b6a61..46122c4cfa 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
@@ -131,6 +131,7 @@ public class FrameDropInvalidTypeTest extends 
AutomatedTestBase
                Types.ExecMode platformOld = setExecMode(et);
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               setOutputBuffering(true);
                try {
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameFunctionTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameFunctionTest.java
index 7234a335a9..861247767d 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameFunctionTest.java
@@ -84,7 +84,7 @@ public class FrameFunctionTest extends AutomatedTestBase
        
                boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
                OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
-               
+               setOutputBuffering(true);
                try
                {
                        //setup testcase
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameIndexingDistTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIndexingDistTest.java
index 63af4d69ad..786803df8d 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameIndexingDistTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIndexingDistTest.java
@@ -130,7 +130,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase
                        config = getTestConfiguration("FrameLeftIndexing");
                else
                        config = getTestConfiguration("FrameRightIndexing");
-                       
+               setOutputBuffering(true);
                try
                {
                        if(indexingMethod != null) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixCastingTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixCastingTest.java
index c98afd24e6..f5269e6232 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixCastingTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixCastingTest.java
@@ -139,7 +139,7 @@ public class FrameMatrixCastingTest extends 
AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                if( rtplatform == ExecMode.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               setOutputBuffering(true);
                try
                {
                        int cols = multColBlks ? cols2 : cols1;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixReblockTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixReblockTest.java
index 8c6bdc9bee..a6497c021f 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixReblockTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixReblockTest.java
@@ -187,7 +187,7 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                if( rtplatform == ExecMode.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               setOutputBuffering(true);
                try
                {
                        int cols = multColBlks ? cols2 : cols1;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixWriteTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixWriteTest.java
index 7500b2e8f5..0e394aa7ab 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixWriteTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMatrixWriteTest.java
@@ -35,16 +35,13 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 
-/**
- * 
- */
 public class FrameMatrixWriteTest extends AutomatedTestBase
 {
        private final static String TEST_DIR = "functions/frame/";
        private final static String TEST_NAME1 = "FrameMatrixWrite";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameMatrixWriteTest.class.getSimpleName() + "/";
 
-       private final static int rows = 2593;
+       private final static int rows = 1320;
        private final static int cols1 = 372;
        private final static int cols2 = 1102;
        
@@ -114,13 +111,6 @@ public class FrameMatrixWriteTest extends AutomatedTestBase
                runFrameWriteTest(TEST_NAME1, true, "csv", ExecType.SPARK);
        }
        
-       /**
-        * 
-        * @param testname
-        * @param multColBlks
-        * @param ofmt
-        * @param et
-        */
        private void runFrameWriteTest( String testname, boolean multColBlks, 
String ofmt, ExecType et)
        {
                //rtplatform for MR
@@ -133,7 +123,7 @@ public class FrameMatrixWriteTest extends AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                if( rtplatform == ExecMode.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               setOutputBuffering(true);
                try
                {
                        int cols = multColBlks ? cols2 : cols1;
@@ -147,7 +137,7 @@ public class FrameMatrixWriteTest extends AutomatedTestBase
                                        String.valueOf(cols), output("B"), ofmt 
};
                        
                        //run testcase
-                       runTest(true, false, null, -1);
+                       runTest(null);
                        
                        //generate compare data
                        double[][] A = new double[rows][cols];
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMetaReadWriteTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMetaReadWriteTest.java
index 3434e2bb35..62a151966c 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameMetaReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameMetaReadWriteTest.java
@@ -94,7 +94,7 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
        
                String ofmt = fmt.toString();
-               
+               setOutputBuffering(true);
                try
                {
                        TestConfiguration config = 
getTestConfiguration(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameReadWriteTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameReadWriteTest.java
similarity index 99%
rename from 
src/test/java/org/apache/sysds/test/component/frame/FrameReadWriteTest.java
rename to 
src/test/java/org/apache/sysds/test/functions/frame/FrameReadWriteTest.java
index 81a47b9223..99391a90f7 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/FrameReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameReadWriteTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.component.frame;
+package org.apache.sysds.test.functions.frame;
 
 import java.io.IOException;
 
@@ -153,7 +153,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
        {
                boolean oldParText = CompilerConfig.FLAG_PARREADWRITE_TEXT;
                boolean oldParBin = CompilerConfig.FLAG_PARREADWRITE_BINARY;
-               
+               setOutputBuffering(true);
                try
                {
                        CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel;
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameRemoveEmptyTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameRemoveEmptyTest.java
similarity index 99%
rename from 
src/test/java/org/apache/sysds/test/component/frame/FrameRemoveEmptyTest.java
rename to 
src/test/java/org/apache/sysds/test/functions/frame/FrameRemoveEmptyTest.java
index 5fb0913c65..762d0c6b9b 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/FrameRemoveEmptyTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameRemoveEmptyTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.component.frame;
+package org.apache.sysds.test.functions.frame;
 
 import static org.junit.Assert.fail;
 
@@ -153,7 +153,7 @@ public class FrameRemoveEmptyTest extends AutomatedTestBase 
{
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                if(rtplatform == Types.ExecMode.SPARK)
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
+               setOutputBuffering(true);
                try {
                        // register test configuration
                        TestConfiguration config = 
getTestConfiguration(testname);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameReplaceTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameReplaceTest.java
index b333bd35d6..b62dea6b26 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameReplaceTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameReplaceTest.java
@@ -33,63 +33,63 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 public class FrameReplaceTest extends AutomatedTestBase {
-    // private static final Log LOG = 
LogFactory.getLog(FrameReplaceTest.class.getName());
-    private final static String TEST_DIR = "functions/frame/";
-    private final static String TEST_NAME = "ReplaceTest";
-    private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameReplaceTest.class.getSimpleName() + "/";
+       // private static final Log LOG = 
LogFactory.getLog(FrameReplaceTest.class.getName());
+       private final static String TEST_DIR = "functions/frame/";
+       private final static String TEST_NAME = "ReplaceTest";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameReplaceTest.class.getSimpleName() + "/";
 
-    @Override
-    public void setUp() {
-        TestUtils.clearAssertionInformation();
-        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, 
TEST_NAME, new String[] {"S.scalar"}));
-    }
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S.scalar"}));
+       }
 
-    @Test
-    public void testParforFrameIntermediatesCP() {
-        runReplaceTest(ExecType.CP);
-    }
+       @Test
+       public void testParforFrameIntermediatesCP() {
+               runReplaceTest(ExecType.CP);
+       }
 
-    @Test
-    @Ignore
-    public void testParforFrameIntermediatesSpark() {
-        runReplaceTest(ExecType.SPARK);
-    }
+       @Test
+       @Ignore
+       public void testParforFrameIntermediatesSpark() {
+               runReplaceTest(ExecType.SPARK);
+       }
 
-    private void runReplaceTest(ExecType et) {
-        ExecMode platformOld = rtplatform;
-        switch(et) {
-            case SPARK:
-                rtplatform = ExecMode.SPARK;
-                break;
-            default:
-                rtplatform = ExecMode.HYBRID;
-                break;
-        }
+       private void runReplaceTest(ExecType et) {
+               ExecMode platformOld = rtplatform;
+               switch(et) {
+                       case SPARK:
+                               rtplatform = ExecMode.SPARK;
+                               break;
+                       default:
+                               rtplatform = ExecMode.HYBRID;
+                               break;
+               }
 
-        boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-        if(rtplatform == ExecMode.SPARK || rtplatform == ExecMode.HYBRID)
-            DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if(rtplatform == ExecMode.SPARK || rtplatform == 
ExecMode.HYBRID)
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               setOutputBuffering(true);
+               try {
+                       // setup testcase
+                       getAndLoadTestConfiguration(TEST_NAME);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-nvargs", "out_S=" + 
output("S")};
 
-        try {
-            // setup testcase
-            getAndLoadTestConfiguration(TEST_NAME);
-            String HOME = SCRIPT_DIR + TEST_DIR;
-            fullDMLScriptName = HOME + TEST_NAME + ".dml";
-            programArgs = new String[] {"-nvargs", "out_S=" + output("S")};
+                       // run test
+                       runTest(null);
+                       HashMap<MatrixValue.CellIndex, Double> val = 
readDMLScalarFromOutputDir("S");
+                       assertEquals(1.0, val.get(new MatrixValue.CellIndex(1, 
1)), 0.0);
 
-            // run test
-            runTest(null);
-            HashMap<MatrixValue.CellIndex, Double> val = 
readDMLScalarFromOutputDir("S");
-            assertEquals(1.0, val.get(new MatrixValue.CellIndex(1, 1)), 0.0);
-
-        }
-        catch(Exception ex) {
-            throw new RuntimeException(ex);
-        }
-        finally {
-            rtplatform = platformOld;
-            DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-        }
-    }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
 
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingIntegratedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingIntegratedTest.java
index 9a23800e94..3d509354a1 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingIntegratedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingIntegratedTest.java
@@ -130,7 +130,7 @@ public class FrameScalarCastingIntegratedTest extends 
AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                if( rtplatform == ExecMode.SPARK || rtplatform == 
ExecMode.HYBRID )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               setOutputBuffering(true);
                try
                {               
                        getAndLoadTestConfiguration(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingTest.java
index de88ef7607..d183d34787 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameScalarCastingTest.java
@@ -92,7 +92,7 @@ public class FrameScalarCastingTest extends AutomatedTestBase
                {
                        TestConfiguration config = 
getTestConfiguration(testname);
                        loadTestConfiguration(config);
-                       
+                       setOutputBuffering(true);
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + testname + ".dml";
                        programArgs = new String[]{"-explain","-args", 
input("A"),
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameSchemaReadTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameSchemaReadTest.java
index 3eccac1a0b..80af3c44cd 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameSchemaReadTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameSchemaReadTest.java
@@ -116,7 +116,7 @@ public class FrameSchemaReadTest extends AutomatedTestBase
                {
                        TestConfiguration config = 
getTestConfiguration(testname);
                        loadTestConfiguration(config);
-                       
+                       setOutputBuffering(true);
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + testname + ".dml";
                        programArgs = new String[]{"-explain","-args", 
input("A"), getSchemaString(schema, wildcard), 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameValueSwapTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameValueSwapTest.java
index 2796d84e3a..e5dbd9e891 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameValueSwapTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameValueSwapTest.java
@@ -61,11 +61,9 @@ public class FrameValueSwapTest extends AutomatedTestBase
                runValueSwapTest(ExecType.SPARK);
        }
 
-       private void runValueSwapTest(ExecType et)
-       {
-               setOutputBuffering(true);
+       private void runValueSwapTest(ExecType et){
                Types.ExecMode platformOld = setExecMode(et);
-
+               setOutputBuffering(true);
                try {
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/ParforFrameIntermediateTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/ParforFrameIntermediateTest.java
index 19965183c2..079ae9b197 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/ParforFrameIntermediateTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/ParforFrameIntermediateTest.java
@@ -67,7 +67,7 @@ public class ParforFrameIntermediateTest extends 
AutomatedTestBase
                if( rtplatform == ExecMode.SPARK 
                        || rtplatform == ExecMode.HYBRID )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               setOutputBuffering(true);
                try
                {
                        //setup testcase
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/TypeOfTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/TypeOfTest.java
index 5bbdae2311..c94bfdc6fb 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/TypeOfTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/TypeOfTest.java
@@ -99,6 +99,7 @@ public class TypeOfTest extends AutomatedTestBase {
        private void runtypeOfTest(Types.ValueType[] schema, int rows, int 
cols, ExecType et) {
                if (et == ExecType.SPARK)
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               setOutputBuffering(true);
                try {
                        getAndLoadTestConfiguration(TEST_NAME);
                        String HOME = SCRIPT_DIR + TEST_DIR;


Reply via email to