Repository: incubator-systemml
Updated Branches:
  refs/heads/master 1729d13ae -> 6550c04b9


[SYSTEMML-927] Fix schema handling spark frame append/right indexing

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/6550c04b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/6550c04b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/6550c04b

Branch: refs/heads/master
Commit: 6550c04b9cf9b446a56bd4846a205a4745b20ec6
Parents: 1729d13
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Tue Sep 20 13:36:02 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Tue Sep 20 13:36:02 2016 -0700

----------------------------------------------------------------------
 .../sysml/api/mlcontext/MatrixFormat.java       | 18 +++----------
 .../controlprogram/caching/FrameObject.java     | 27 ++++++++++++++++++++
 .../spark/FrameAppendMSPInstruction.java        |  5 ++++
 .../spark/FrameAppendRSPInstruction.java        |  5 ++++
 .../spark/FrameIndexingSPInstruction.java       |  4 +++
 5 files changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
index 4f4c7f9..0c07dd2 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
@@ -63,13 +63,8 @@ public enum MatrixFormat {
         *         otherwise.
         */
        public boolean isVectorBased() {
-               if (this == DF_VECTOR_WITH_INDEX) {
-                       return true;
-               } else if (this == DF_VECTOR) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (this == DF_VECTOR_WITH_INDEX
+                       || this == DF_VECTOR);
        }
 
        /**
@@ -79,13 +74,8 @@ public enum MatrixFormat {
         *         otherwise.
         */
        public boolean hasIDColumn() {
-               if (this == DF_DOUBLES_WITH_INDEX) {
-                       return true;
-               } else if (this == DF_VECTOR_WITH_INDEX) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (this == DF_DOUBLES_WITH_INDEX 
+                       || this == DF_VECTOR_WITH_INDEX);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index e3d2332..bfccdf1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -91,6 +91,33 @@ public class FrameObject extends CacheableData<FrameBlock>
                return _schema;
        }
 
+       /**
+        * 
+        * @param cl column lower bound, inclusive
+        * @param cu column upper bound, inclusive
+        * @return
+        */
+       public List<ValueType> getSchema(int cl, int cu) {
+               return (_schema!=null && _schema.size()>cu) ? 
_schema.subList(cl, cu+1) :
+                       Collections.nCopies(cu-cl+1, ValueType.STRING);
+       }
+
+       /**
+        * Creates a new collection which contains the schema of the current
+        * frame object concatenated with the schema of the passed frame object.
+        * 
+        * @param fo
+        * @return
+        */
+       public List<ValueType> mergeSchemas(FrameObject fo) {
+               ArrayList<ValueType> ret = new ArrayList<ValueType>();
+               ret.addAll((_schema!=null) ? _schema : 
+                       Collections.nCopies((int)getNumColumns(), 
ValueType.STRING));
+               ret.addAll((fo.getSchema()!=null) ? fo.getSchema() : 
+                       Collections.nCopies((int)fo.getNumColumns(), 
ValueType.STRING));
+               return ret;
+       } 
+       
        public void setSchema(String schema) {
                if( schema.equals("*") ) {
                        //populate default schema

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index 7aad0bf..236dfb0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendMSPInstruction extends 
AppendMSPInstruction
                sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
                sec.addLineageBroadcast(output.getName(), input2.getName());
+               
+               //update schema of output with merged input schemas
+               sec.getFrameObject(output.getName()).setSchema(
+                       sec.getFrameObject(input1.getName()).mergeSchemas(
+                       sec.getFrameObject(input2.getName())));
        }
        
        /** 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index 067769d..ad6ef58 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction
                sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
                sec.addLineageRDD(output.getName(), input2.getName());          
+               
+               //update schema of output with merged input schemas
+               sec.getFrameObject(output.getName()).setSchema(
+                       sec.getFrameObject(input1.getName()).mergeSchemas(
+                       sec.getFrameObject(input2.getName())));
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index ac45ec0..b4556cf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -117,6 +117,10 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);
                        sec.addLineageRDD(output.getName(), input1.getName());
+                       
+                       //update schema of output with subset of input schema
+                       sec.getFrameObject(output.getName()).setSchema(
+                               
sec.getFrameObject(input1.getName()).getSchema((int)cl, (int)cu));
                }
                //left indexing
                else if ( opcode.equalsIgnoreCase("leftIndex") || 
opcode.equalsIgnoreCase("mapLeftIndex"))

Reply via email to