Repository: incubator-systemml
Updated Branches:
  refs/heads/master 3e4ceafc1 -> 266d4c8ce


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index 53caea9..1a11af9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -440,20 +441,20 @@ public class MapReduceTool
        
        public static void writeMetaDataFile(String mtdfile, ValueType vt, 
MatrixCharacteristics mc, OutputInfo outinfo) 
                throws IOException {
-               writeMetaDataFile(mtdfile, vt, DataType.MATRIX, mc, outinfo);
+               writeMetaDataFile(mtdfile, vt, null, DataType.MATRIX, mc, 
outinfo);
        }
        
-       public static void writeMetaDataFile(String mtdfile, ValueType vt, 
DataType dt, MatrixCharacteristics mc, OutputInfo outinfo) 
+       public static void writeMetaDataFile(String mtdfile, ValueType vt, 
List<ValueType> schema, DataType dt, MatrixCharacteristics mc, OutputInfo 
outinfo) 
                throws IOException {
-               writeMetaDataFile(mtdfile, vt, dt, mc, outinfo, null);
+               writeMetaDataFile(mtdfile, vt, schema, dt, mc, outinfo, null);
        }
 
        public static void writeMetaDataFile(String mtdfile, ValueType vt, 
MatrixCharacteristics mc,  OutputInfo outinfo, FileFormatProperties 
formatProperties) 
                throws IOException {
-               writeMetaDataFile(mtdfile, vt, DataType.MATRIX, mc, outinfo, 
formatProperties);
+               writeMetaDataFile(mtdfile, vt, null, DataType.MATRIX, mc, 
outinfo, formatProperties);
        }
        
-       public static void writeMetaDataFile(String mtdfile, ValueType vt, 
DataType dt, MatrixCharacteristics mc, 
+       public static void writeMetaDataFile(String mtdfile, ValueType vt, 
List<ValueType> schema, DataType dt, MatrixCharacteristics mc, 
                        OutputInfo outinfo, FileFormatProperties 
formatProperties) 
                throws IOException 
        {
@@ -467,25 +468,52 @@ public class MapReduceTool
                try {
                        // build JSON metadata object
                        mtd.put(DataExpression.DATATYPEPARAM, 
dt.toString().toLowerCase());
-                       switch (vt) {
-                               case DOUBLE:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"double");
-                                       break;
-                               case INT:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"int");
-                                       break;
-                               case BOOLEAN:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"boolean");
-                                       break;
-                               case STRING:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"string");
-                                       break;
-                               case UNKNOWN:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"unknown");
-                                       break;
-                               case OBJECT:
-                                       mtd.put(DataExpression.VALUETYPEPARAM, 
"object");
-                                       break;
+                       if (schema == null)
+                               switch (vt) {
+                                       case DOUBLE:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "double");
+                                               break;
+                                       case INT:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "int");
+                                               break;
+                                       case BOOLEAN:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "boolean");
+                                               break;
+                                       case STRING:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "string");
+                                               break;
+                                       case UNKNOWN:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "unknown");
+                                               break;
+                                       case OBJECT:
+                                               
mtd.put(DataExpression.VALUETYPEPARAM, "object");
+                                               break;
+                               }
+                       else
+                       {
+                               StringBuffer schemaStrBuffer = new 
StringBuffer();
+                               for(int i=0; i < schema.size(); i++) {
+                                       switch (schema.get(i)) {
+                                               case DOUBLE:
+                                                       
schemaStrBuffer.append("DOUBLE");
+                                                       break;
+                                               case INT:
+                                                       
schemaStrBuffer.append("INT");
+                                                       break;
+                                               case BOOLEAN:
+                                                       
schemaStrBuffer.append("BOOLEAN");
+                                                       break;
+                                               case STRING:
+                                                       
schemaStrBuffer.append("STRING");
+                                                       break;
+                                               case UNKNOWN:
+                                               default:
+                                                       
schemaStrBuffer.append("*");
+                                                       break;
+                                       }
+                                       
schemaStrBuffer.append(DataExpression.DEFAULT_DELIM_DELIMITER);
+                               }
+                               mtd.put(DataExpression.SCHEMAPARAM, 
schemaStrBuffer.toString());
                        }
                        mtd.put(DataExpression.READROWPARAM, mc.getRows());
                        mtd.put(DataExpression.READCOLPARAM, mc.getCols());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 3711a1a..a904651 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -20,12 +20,16 @@
 package org.apache.sysml.runtime.util;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.wink.json4j.JSONArray;
 
 public class UtilFunctions 
 {
@@ -144,6 +148,23 @@ public class UtilFunctions
         * @param ix
         * @param brlen
         * @param bclen
+        * @param rl
+        * @param ru
+        * @return
+        */
+       public static boolean isInFrameBlockRange( Long ix, int brlen, int 
bclen, long rl, long ru )
+       {
+               if(rl > ix+brlen-1 || ru < ix)
+                       return false;
+               else
+                       return true;
+       }
+       
+       /**
+        * 
+        * @param ix
+        * @param brlen
+        * @param bclen
         * @param ixrange
         * @return
         */
@@ -154,6 +175,20 @@ public class UtilFunctions
                                ixrange.colStart, ixrange.colEnd);
        }
        
+       /**
+        * 
+        * @param ix
+        * @param brlen
+        * @param bclen
+        * @param ixrange
+        * @return
+        */
+       public static boolean isInFrameBlockRange( Long ix, int brlen, int 
bclen, IndexRange ixrange )
+       {
+               return isInFrameBlockRange(ix, brlen, bclen, 
+                               ixrange.rowStart, ixrange.rowEnd);
+       }
+       
        // Reused by both MR and Spark for performing zero out
        public static IndexRange getSelectedRangeForZeroOut(IndexedMatrixValue 
in, int blockRowFactor, int blockColFactor, IndexRange indexRange) 
        {
@@ -191,6 +226,23 @@ public class UtilFunctions
                return tempRange;
        }
        
+       // Reused by both MR and Spark for performing zero out
+       public static IndexRange getSelectedRangeForZeroOut(Pair<Long, 
FrameBlock> in, int blockRowFactor, int blockColFactor, IndexRange indexRange, 
long lSrcRowIndex, long lDestRowIndex) 
+       {
+               int iRowStart, iRowEnd, iColStart, iColEnd;
+               
+               if(indexRange.rowStart <= lDestRowIndex)
+                       iRowStart = 0;
+               else
+                       iRowStart = (int) (indexRange.rowStart - in.getKey());
+               iRowEnd = (int) Math.min(indexRange.rowEnd - lSrcRowIndex, 
blockRowFactor)-1;
+               
+               iColStart = 
UtilFunctions.computeCellInBlock(indexRange.colStart, blockColFactor);
+               iColEnd = UtilFunctions.computeCellInBlock(indexRange.colEnd, 
blockColFactor);
+
+               return  new IndexRange(iRowStart, iRowEnd, iColStart, iColEnd);
+       }
+       
        public static long getTotalLength(NumItemsByEachReducerMetaData 
metadata) {
                long[] counts=metadata.getNumItemsArray();
                long total=0;
@@ -337,9 +389,62 @@ public class UtilFunctions
        
        /**
         * 
+        * @param in
+        * @param ignoreNull    
+        *              If this flag has set, it will ignore null. This flag is 
mainly used in merge functionality to override data with "null" data.
+        * @return
+        */
+       public static String objectToString( Object in, boolean ignoreNull ) {
+               String strReturn = objectToString(in); 
+               if( strReturn == null )
+                       return strReturn;
+               else if (ignoreNull){
+                       if(in instanceof Double && ((Double)in).doubleValue() 
== 0.0)
+                               return null;
+                       else if(in instanceof Long && ((Long)in).longValue() == 
0)
+                               return null;
+                       else if(in instanceof Boolean && 
((Boolean)in).booleanValue() == false)
+                               return null;
+                       else if(in instanceof String && 
((String)in).trim().length() == 0)
+                               return null;
+                       else
+                               return strReturn;
+               } 
+               else
+                       return strReturn;
+       }
+       
+       /**
+        * 
+        * @param vt
+        * @param in
+        * @return
+        */
+       public static Object objectToObject(ValueType vt, Object in ) {
+               String str = objectToString(in);
+               return stringToObject(vt, str );
+       }
+       
+       /**
+        * 
+        * @param vt
+        * @param in
+        * @return
+        */
+       public static Object objectToObject(ValueType vt, Object in, boolean 
ignoreNull ) {
+               String str = objectToString(in, ignoreNull);
+               if (str==null || vt == ValueType.STRING)
+                       return str;
+               else
+                       return stringToObject(vt, str); 
+       }       
+       
+       /**
+        * 
         * @param vt
         * @param in1
         * @param in2
+        * 
         * @return
         */
        public static int compareTo(ValueType vt, Object in1, Object in2) {
@@ -355,7 +460,7 @@ public class UtilFunctions
                        default: throw new RuntimeException("Unsupported value 
type: "+vt);
                }
        }
-       
+
        /**
         * Compares two version strings of format x.y.z, where x is major,
         * y is minor, and z is maintenance release.
@@ -458,4 +563,38 @@ public class UtilFunctions
                        ret.add(i);
                return ret;
        }
+
+       /**
+        * Returns the schema based on Json object
+        * 
+        * @param schemaObject
+        * @return
+        */
+       public static List<ValueType> getSchemaType(Object schemaObject)
+       {
+               JSONArray schemaJsonArr = (JSONArray)schemaObject;
+               ValueType[] schemaArray = new ValueType[schemaJsonArr.size()];
+               
+               for(int i=0; i < schemaJsonArr.length(); i++)
+                               schemaArray[i] = 
ValueType.valueOf((String)schemaJsonArr.get(0));
+               return Arrays.asList(schemaArray);
+       }
+       
+       /**
+        * Returns the subset of the schema 
+        * 
+        * @param srcSchema
+        * @param lStart
+        * @param lEnd
+        * 
+        * @return
+        */
+       public static List<ValueType> getSubSchema(List<ValueType> srcSchema, 
long lStart, long lEnd)
+       {
+               ValueType [] schema = new ValueType[(int) (lEnd-lStart+1)];
+               for(int i = 0; i < schema.length; i++)
+                       schema[i] = srcSchema.get((int) (lStart+i));
+               
+               return Arrays.asList(schema);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java 
b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
index 066e4e5..4273113 100644
--- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.sysml.lops.Lop;
 import org.apache.commons.io.FileUtils;
@@ -45,13 +46,19 @@ import org.apache.sysml.api.MLContext;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.DataExpression;
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.io.FrameReader;
+import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.test.utils.TestUtils;
@@ -722,6 +729,44 @@ public abstract class AutomatedTestBase
        protected static HashMap<CellIndex, Double> 
readDMLScalarFromHDFS(String fileName) {
                return TestUtils.readDMLScalarFromHDFS(baseDirectory + 
OUTPUT_DIR + fileName);
        }
+       
+       
+       protected static FrameBlock readDMLFrameFromHDFS(String fileName, 
InputInfo iinfo) 
+                       throws DMLRuntimeException, IOException 
+       {
+               //read frame data from hdfs
+               String strFrameFileName = baseDirectory + OUTPUT_DIR + fileName;
+               FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
+               
+               MatrixCharacteristics md = readDMLMetaDataFile(fileName);
+               return reader.readFrameFromHDFS(strFrameFileName, md.getRows(), 
md.getCols());
+       }
+
+
+       protected static FrameBlock readDMLFrameFromHDFS(String fileName, 
InputInfo iinfo, MatrixCharacteristics md) 
+                       throws DMLRuntimeException, IOException 
+       {
+               //read frame data from hdfs
+               String strFrameFileName = baseDirectory + OUTPUT_DIR + fileName;
+               FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
+               
+               return reader.readFrameFromHDFS(strFrameFileName, md.getRows(), 
md.getCols());
+       }
+
+       protected static FrameBlock readRFrameFromHDFS(String fileName, 
InputInfo iinfo, MatrixCharacteristics md) 
+                       throws DMLRuntimeException, IOException 
+       {
+               //read frame data from hdfs
+               String strFrameFileName = baseDirectory + EXPECTED_DIR + 
fileName;
+
+               CSVFileFormatProperties fprop = new CSVFileFormatProperties();
+               fprop.setHeader(true);
+               FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo, fprop);
+               
+               return reader.readFrameFromHDFS(strFrameFileName, md.getRows(), 
md.getCols());
+       }
+
+
 
        public HashMap<CellIndex, Double> readRScalarFromFS(String fileName) {
                System.out.println("R script out: " + baseDirectory + 
EXPECTED_DIR + cacheDir + fileName);
@@ -1640,4 +1685,90 @@ public abstract class AutomatedTestBase
                
                return sourceDirectory;
        }
+       
+       /**
+        * <p>
+        * Adds a frame to the input path and writes it to a file.
+        * </p>
+        * 
+        * @param name
+        *            directory name
+        * @param matrix
+        *            two dimensional frame data
+        * @param bIncludeR
+        *            generates also the corresponding R frame data
+        * @throws IOException 
+        * @throws DMLRuntimeException 
+        */
+       protected double[][] writeInputFrame(String name, double[][] data, 
boolean bIncludeR, List<ValueType> schema, OutputInfo oi) 
+                       throws DMLRuntimeException, IOException 
+       {
+               String completePath = baseDirectory + INPUT_DIR + name;
+               String completeRPath = baseDirectory + INPUT_DIR + name + 
".csv";
+               
+               try {
+                       cleanupExistingData(baseDirectory + INPUT_DIR + name, 
bIncludeR);
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       throw new RuntimeException(e);
+               }
+               
+               TestUtils.writeTestFrame(completePath, data, schema, oi);
+               if (bIncludeR) {
+                       TestUtils.writeTestFrame(completeRPath, data, schema, 
OutputInfo.CSVOutputInfo, true);
+                       inputRFiles.add(completeRPath);
+               }
+               if (DEBUG)
+                       TestUtils.writeTestFrame(DEBUG_TEMP_DIR + completePath, 
data, schema, oi);
+               inputDirectories.add(baseDirectory + INPUT_DIR + name);
+
+               return data;
+       }
+
+       protected double[][] writeInputFrameWithMTD(String name, double[][] 
data, boolean bIncludeR, List<ValueType> schema, OutputInfo oi) 
+                       throws DMLRuntimeException, IOException 
+       {
+               MatrixCharacteristics mc = new 
MatrixCharacteristics(data.length, data[0].length, 
OptimizerUtils.DEFAULT_BLOCKSIZE, data[0].length, -1);
+               return writeInputFrameWithMTD(name, data, bIncludeR, mc, 
schema, oi);
+       }
+       
+       protected double[][] writeInputFrameWithMTD(String name, double[][] 
data, boolean bIncludeR, MatrixCharacteristics mc, List<ValueType> schema, 
OutputInfo oi) 
+                       throws DMLRuntimeException, IOException 
+       {
+               writeInputFrame(name, data, bIncludeR, schema, oi);
+               
+               // write metadata file
+               try
+               {
+                       String completeMTDPath = baseDirectory + INPUT_DIR + 
name + ".mtd";
+                       MapReduceTool.writeMetaDataFile(completeMTDPath, null, 
schema, DataType.FRAME, mc, oi);
+               }
+               catch(IOException e)
+               {
+                       e.printStackTrace();
+                       throw new RuntimeException(e);
+               }
+       
+               return data;
+       }
+       
+       /**
+        * <p>
+        * Adds a frame to the input path and writes it to a file.
+        * </p>
+        * 
+        * @param name
+        *            directory name
+        * @param matrix
+        *            two dimensional frame data
+        * @param schema
+        * @param oi
+        * @throws IOException 
+        * @throws DMLRuntimeException 
+        */
+       protected double[][] writeInputFrame(String name, double[][] data, 
List<ValueType> schema, OutputInfo oi) 
+                       throws DMLRuntimeException, IOException 
+       {
+               return writeInputFrame(name, data, false, schema, oi);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 7357989..7a8392b 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -221,6 +221,9 @@ public class FrameConverterTest extends AutomatedTestBase
                                        iinfo = InputInfo.TextCellInputInfo;
                                        break;
                                case MAT2BIN: 
+                                       oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                                       iinfo = InputInfo.BinaryBlockInputInfo;
+                                       break;
                                case BIN2MAT:
                                        oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                        iinfo = InputInfo.BinaryBlockInputInfo;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
new file mode 100644
index 0000000..1ba1a32
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.LeftIndexingOp;
+import org.apache.sysml.hops.LeftIndexingOp.LeftIndexingMethod;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class FrameIndexingDistTest extends AutomatedTestBase
+{
+       
+       private final static String TEST_DIR = "functions/frame/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameIndexingDistTest.class.getSimpleName() + "/";
+       private final static String TEST_NAME = "FrameIndexingDistTest";
+       
+       private final static double epsilon=0.0000000001;
+
+       // Test data with 2 blocks of rows and columns
+       private final static int rows = 1279, cols=1060;
+       
+       private final static int min=0;
+       private final static int max=100;
+       
+       private final static List<ValueType> schemaMixedLargeListStr = 
Collections.nCopies(cols/4, ValueType.STRING);
+       private final static List<ValueType> schemaMixedLargeListDble  = 
Collections.nCopies(cols/4, ValueType.DOUBLE);
+       private final static List<ValueType> schemaMixedLargeListInt  = 
Collections.nCopies(cols/4, ValueType.INT);
+       private final static List<ValueType> schemaMixedLargeListBool  = 
Collections.nCopies(cols/4, ValueType.BOOLEAN);
+       private static ValueType[] schemaMixedLarge = null;
+       static {
+               final List<ValueType> schemaMixedLargeList = new 
ArrayList<ValueType>(schemaMixedLargeListStr);
+               schemaMixedLargeList.addAll(schemaMixedLargeListDble);
+               schemaMixedLargeList.addAll(schemaMixedLargeListInt);
+               schemaMixedLargeList.addAll(schemaMixedLargeListBool);
+               schemaMixedLarge = new ValueType[schemaMixedLargeList.size()];
+               schemaMixedLarge = (ValueType[]) 
schemaMixedLargeList.toArray(schemaMixedLarge);
+       }
+        
+       @Override
+       public void setUp() {
+               addTestConfiguration("FrameIndexingDistTest", new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, 
+                               new String[] {"AB", "AC", "AD"}));
+       }
+       
+
+       // Left Indexing Spark test cases
+       @Test
+       public void testMapLeftIndexingSP() throws DMLRuntimeException, 
IOException {
+               runTestLeftIndexing(ExecType.SPARK, 
LeftIndexingMethod.SP_MLEFTINDEX, schemaMixedLarge);
+       }
+       
+       @Test
+       public void testGeneralLeftIndexingSP() throws DMLRuntimeException, 
IOException {
+               runTestLeftIndexing(ExecType.SPARK, 
LeftIndexingMethod.SP_GLEFTINDEX, schemaMixedLarge);
+       }
+       
+       private void runTestLeftIndexing(ExecType et, 
LeftIndexingOp.LeftIndexingMethod indexingMethod, ValueType[] schema) throws 
DMLRuntimeException, IOException {
+               
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               RUNTIME_PLATFORM oldRTP = rtplatform;
+               TestConfiguration config = 
getTestConfiguration("FrameIndexingDistTest");
+               try
+               {
+                       if(indexingMethod != null) {
+                               LeftIndexingOp.FORCED_LEFT_INDEXING = 
indexingMethod;
+                       }
+                       
+                       if(et == ExecType.SPARK) {
+                       rtplatform = RUNTIME_PLATFORM.SPARK;
+                   }
+                       else {
+                               // rtplatform = (et==ExecType.MR)? 
RUNTIME_PLATFORM.HADOOP : RUNTIME_PLATFORM.SINGLE_NODE;
+                           rtplatform = RUNTIME_PLATFORM.HYBRID;
+                       }
+                       if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                               DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+                       
+                   
+               config.addVariable("rows", rows);
+               config.addVariable("cols", cols);
+             
+               long rowstart=816, rowend=1229, colstart=109 /*967*/, 
colend=1009;
+
+               config.addVariable("rowstart", rowstart);
+               config.addVariable("rowend", rowend);
+               config.addVariable("colstart", colstart);
+               config.addVariable("colend", colend);
+                       loadTestConfiguration(config);
+               
+                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
+                       String LI_HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = LI_HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-args",  input("A"),
+                               Long.toString(rows), Long.toString(cols),
+                               Long.toString(rowstart), Long.toString(rowend),
+                               Long.toString(colstart), Long.toString(colend),
+                               output("AB"), output("AC"), output("AD"),
+                               input("B"), input("C"), input("D"),
+                               Long.toString(rowend-rowstart+1), 
+                               Long.toString(colend-colstart+1),
+                               Long.toString(cols-colstart+1)};
+                       
+                       fullRScriptName = LI_HOME + TEST_NAME + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
+                               inputDir() + " " + rowstart + " " + rowend + " 
" + colstart + " " + colend + " " + expectedDir();
+                       
+                       //initialize the frame data.
+                       List<ValueType> lschema = Arrays.asList(schema);
+       
+                       double sparsity=1.0;//rand.nextDouble(); 
+               double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 
1111 /*\\System.currentTimeMillis()*/);
+               writeInputFrameWithMTD("A", A, true, lschema, 
OutputInfo.BinaryBlockOutputInfo);                
+               
+               sparsity=0.1;//rand.nextDouble();
+               double[][] B = getRandomMatrix((int)(rowend-rowstart+1), 
(int)(colend-colstart+1), min, max, sparsity, 2345 
/*System.currentTimeMillis()*/);
+               List<ValueType> lschemaB = lschema.subList((int)colstart-1, 
(int)colend); 
+               writeInputFrameWithMTD("B", B, true, lschemaB, 
OutputInfo.BinaryBlockOutputInfo);               
+
+               sparsity=0.5;//rand.nextDouble();
+               double[][] C = getRandomMatrix((int)(rowend), 
(int)(cols-colstart+1), min, max, sparsity, 3267 
/*System.currentTimeMillis()*/);
+               List<ValueType> lschemaC = lschema.subList((int)colstart-1, 
(int)cols); 
+               writeInputFrameWithMTD("C", C, true, lschemaC, 
OutputInfo.BinaryBlockOutputInfo);               
+
+               sparsity=0.01;//rand.nextDoublBe();
+               double[][] D = getRandomMatrix(rows, (int)(colend-colstart+1), 
min, max, sparsity, 4856 /*System.currentTimeMillis()*/);
+               writeInputFrameWithMTD("D", D, true, lschemaB, 
OutputInfo.BinaryBlockOutputInfo);               
+       
+               boolean exceptionExpected = false;
+                       int expectedNumberOfJobs = -1;
+                       runTest(true, exceptionExpected, null, 
expectedNumberOfJobs);
+               }
+               catch(Exception ex) {
+                       ex.printStackTrace();
+                       throw new RuntimeException(ex);
+               }
+               finally
+               {
+                       rtplatform = oldRTP;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       LeftIndexingOp.FORCED_LEFT_INDEXING = null;
+               }
+               
+               runRScript(true);
+       
+               for(String file: config.getOutputFiles())
+               {
+                       FrameBlock frameBlock = readDMLFrameFromHDFS(file, 
InputInfo.BinaryBlockInputInfo);
+                       MatrixCharacteristics md = new 
MatrixCharacteristics(frameBlock.getNumRows(), frameBlock.getNumColumns(), -1, 
-1);
+                       FrameBlock frameRBlock = 
readRFrameFromHDFS(file+".csv", InputInfo.CSVInputInfo, md);
+                       verifyFrameData(frameBlock, frameRBlock, schema);
+                       System.out.println("File processed is " + file);
+               }
+       }
+       
+       private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, 
ValueType[] schema) {
+               for ( int i=0; i<frame1.getNumRows(); ++i )
+                       for( int j=0; j<frame1.getNumColumns(); j++ )   {
+                               Object val1 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame1.get(i, j)));
+                               Object val2 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame2.get(i, j)));
+                               if( TestUtils.compareToR(schema[j], val1, val2, 
epsilon) != 0)
+                                       Assert.fail("The DML data for cell ("+ 
i + "," + j + ") is " + val1 + 
+                                                       ", not same as the R 
value " + val2);
+                       }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
index 5edada9..fba037d 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
@@ -214,7 +214,7 @@ public class FrameMatrixCastingTest extends 
AutomatedTestBase
                
                //write meta data
                MatrixCharacteristics mc = new MatrixCharacteristics(rows, 
cols, blksize, blksize);
-               MapReduceTool.writeMetaDataFile(fname+".mtd", vt, dt, mc, 
OutputInfo.BinaryBlockOutputInfo);
+               MapReduceTool.writeMetaDataFile(fname+".mtd", vt, null, dt, mc, 
OutputInfo.BinaryBlockOutputInfo);
        
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/java/org/apache/sysml/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java 
b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
index 78ebf60..834064b 100644
--- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java
+++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
 import java.util.Random;
 import java.util.StringTokenizer;
@@ -53,12 +54,19 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.io.FrameWriter;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.test.integration.BinaryMatrixCharacteristics;
 
 
@@ -842,7 +850,67 @@ public class TestUtils
                _AssertOccured = true;
                return false;
        }
-
+       
+       
+       /**
+        * 
+        * @param vt
+        * @param in1
+        * @param in2
+        * @param tolerance
+        * 
+        * @return
+        */
+       public static int compareTo(ValueType vt, Object in1, Object in2, 
double tolerance) {
+               if(in1 == null && in2 == null) return 0;
+               else if(in1 == null) return -1;
+               else if(in2 == null) return 1;
+ 
+               switch( vt ) {
+                       case STRING:  return 
((String)in1).compareTo((String)in2);
+                       case BOOLEAN: return 
((Boolean)in1).compareTo((Boolean)in2);
+                       case INT:     return ((Long)in1).compareTo((Long)in2);
+                       case DOUBLE:  
+                               return (Math.abs((Double)in1-(Double)in2) < 
tolerance)?0:       
+                                       ((Double)in1).compareTo((Double)in2);
+                       default: throw new RuntimeException("Unsupported value 
type: "+vt);
+               }
+       }
+       
+       /**
+        * 
+        * @param vt
+        * @param in1
+        * @param inR
+        * @return
+        */
+       public static int compareToR(ValueType vt, Object in1, Object inR, 
double tolerance) {
+               if(in1 == null && (inR == null || 
(inR.toString().compareTo("NA")==0))) return 0;
+               else if(in1 == null && vt == ValueType.STRING) return -1;
+               else if(inR == null) return 1;
+ 
+               switch( vt ) {
+                       case STRING:  return 
((String)in1).compareTo((String)inR);
+                       case BOOLEAN: 
+                               if(in1 == null)
+                                       return 
Boolean.FALSE.compareTo(((Boolean)inR).booleanValue());
+                               else
+                                       return 
((Boolean)in1).compareTo((Boolean)inR);
+                       case INT:     
+                               if(in1 == null)
+                                       return new 
Long(0).compareTo(((Long)inR));
+                               else
+                                       return ((Long)in1).compareTo((Long)inR);
+                       case DOUBLE:  
+                               if(in1 == null)
+                                       return (new 
Double(0)).compareTo((Double)inR);
+                               else
+                                       return 
(Math.abs((Double)in1-(Double)inR) < tolerance)?0:       
+                                               
((Double)in1).compareTo((Double)inR);
+                       default: throw new RuntimeException("Unsupported value 
type: "+vt);
+               }
+       }
+       
        /**
         * Converts a 2D array into a sparse hashmap matrix.
         * 
@@ -1534,6 +1602,67 @@ public class TestUtils
                writeTestMatrix(file, matrix, false);
        }
 
+       
+       /**
+        * <p>
+        * Writes a frame to a file using the text format.
+        * </p>
+        * 
+        * @param file
+        *            file name
+        * @param data
+        *            frame data
+        * @param isR
+        * @throws IOException 
+        * @throws DMLRuntimeException 
+        */
+       public static void writeTestFrame(String file, double[][] data, 
List<ValueType> schema, OutputInfo oi, boolean isR) 
+                       throws DMLRuntimeException, IOException 
+       {
+               FrameWriter writer = FrameWriterFactory.createFrameWriter(oi);
+               FrameBlock frame = new FrameBlock(schema);
+               initFrameData(frame, data, schema, data.length);
+               writer.writeFrameToHDFS(frame, file, data.length, 
schema.size());
+       }
+       
+       /**
+        * <p>
+        * Writes a frame to a file using the text format.
+        * </p>
+        * 
+        * @param file
+        *            file name
+        * @param data
+        *            frame data
+        * @throws IOException 
+        * @throws DMLRuntimeException 
+        */
+       public static void writeTestFrame(String file, double[][] data, 
List<ValueType> schema, OutputInfo oi)
+               throws DMLRuntimeException, IOException
+       {
+               writeTestFrame(file, data, schema, oi, false);
+       }
+
+       /**
+        * 
+        * @param frame
+        * @param data
+        * @param lschema
+        */
+       public static void initFrameData(FrameBlock frame, double[][] data, 
List<ValueType> lschema, int rows) {
+               Object[] row1 = new Object[lschema.size()];
+               for( int i=0; i<rows; i++ ) {
+                       for( int j=0; j<lschema.size(); j++ ) {
+                               data[i][j] = 
UtilFunctions.objectToDouble(lschema.get(j), 
+                                               row1[j] = 
UtilFunctions.doubleToObject(lschema.get(j), data[i][j]));
+                               if(row1[j] != null && lschema.get(j) == 
ValueType.STRING)
+                                       row1[j] = "Str" + row1[j];
+                       }
+                       frame.appendRow(row1);
+               }
+       }
+
+       
        /* Write a scalar value to a file */
        public static void writeTestScalar(String file, double value) {
                try {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/scripts/functions/frame/FrameIndexingDistTest.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameIndexingDistTest.R 
b/src/test/scripts/functions/frame/FrameIndexingDistTest.R
new file mode 100644
index 0000000..b3b55a9
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameIndexingDistTest.R
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+C=read.csv(paste(args[1], "C.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+D=read.csv(paste(args[1], "D.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+
+A[args[2]:args[3],args[4]:args[5]]=0
+A[args[2]:args[3],args[4]:args[5]]=B
+write.csv(A, paste(args[6], "AB.csv", sep=""), row.names = FALSE, quote = 
FALSE)
+
+A[1:args[3],args[4]:ncol(A)]=0
+A[1:args[3],args[4]:ncol(A)]=C
+write.csv(A, paste(args[6], "AC.csv", sep=""), row.names = FALSE, quote = 
FALSE)
+
+A[,args[4]:args[5]]=0
+A[,args[4]:args[5]]=D
+write.csv(A, paste(args[6], "AD.csv", sep=""), row.names = FALSE, quote = 
FALSE)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/test/scripts/functions/frame/FrameIndexingDistTest.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameIndexingDistTest.dml 
b/src/test/scripts/functions/frame/FrameIndexingDistTest.dml
new file mode 100644
index 0000000..5e998cb
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameIndexingDistTest.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($11, data_type="frame", rows=$14, cols=$15, format="binary")
+C=read($12, data_type="frame", rows=$5, cols=$16, format="binary")
+D=read($13, data_type="frame", rows=$2, cols=$15, format="binary")
+A[$4:$5,$6:$7]=B
+write(A, $8, format="binary")
+A[1:$5,$6:ncol(A)]=C
+write(A, $9, format="binary")
+A[,$6:$7]=D
+write(A, $10, format="binary")


Reply via email to