[SYSTEMML-2293] Fix correctness parfor data partitioning w/ cp_file rix

This patch fixes result correctness for a special case of parfor data
partitioning with nested loops or reuse of partition files across outer
loops. Specifically, for large partitions, we use a CP_FILE instead of
CP right indexing which simply creates a new meta data object around the
specific partition file name and thus avoids unnecessary eviction for
large partitions. However, this intermediate was cleaned up as any other
intermediates leading to deleted partition files. In case of reuse of
partitioned matrices this led to silently incorrect results because
empty matrices are created for missing files (to support ultra sparse
partitioned matrices where empty file are not materialized).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/45a93396
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/45a93396
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/45a93396

Branch: refs/heads/master
Commit: 45a93396aff1bbaad8f8cd94ad61fd01acef588b
Parents: d74bda4
Author: Matthias Boehm <[email protected]>
Authored: Mon Apr 30 19:44:40 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Apr 30 19:47:45 2018 -0700

----------------------------------------------------------------------
 .../cpfile/MatrixIndexingCPFileInstruction.java | 37 ++++++++++----------
 1 file changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/45a93396/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
index 88a0624..4edd0a7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
@@ -67,12 +67,10 @@ public final class MatrixIndexingCPFileInstruction extends 
IndexingCPInstruction
                                throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
                        }
                } 
-               else if ( parts[0].equalsIgnoreCase(LeftIndex.OPCODE)) 
-               {
-                       throw new DMLRuntimeException("Invalid opcode while 
parsing a MatrixIndexingCPFileInstruction: " + str);        
+               else if ( parts[0].equalsIgnoreCase(LeftIndex.OPCODE)) {
+                       throw new DMLRuntimeException("Invalid opcode while 
parsing a MatrixIndexingCPFileInstruction: " + str);
                }
-               else 
-               {
+               else {
                        throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixIndexingCPFileInstruction: " + str);
                }
        }
@@ -89,45 +87,46 @@ public final class MatrixIndexingCPFileInstruction extends 
IndexingCPInstruction
                        MatrixCharacteristics mc = 
meta.getMatrixCharacteristics();
                        String pfname = mo.getPartitionFileName( ixrange, 
mc.getRowsPerBlock(), mc.getColsPerBlock());
                        
-                       if( MapReduceTool.existsFileOnHDFS(pfname) )
-                       {
+                       if( MapReduceTool.existsFileOnHDFS(pfname) ) { //default
                                //create output matrix object
                                MatrixObject mobj = new 
MatrixObject(mo.getValueType(), pfname );
                                MatrixCharacteristics mcNew = null;
-                               switch( mo.getPartitionFormat() )
-                               {
+                               switch( mo.getPartitionFormat() ) {
                                        case ROW_WISE:
                                                mcNew = new 
MatrixCharacteristics( 1, mc.getCols(), mc.getRowsPerBlock(), 
mc.getColsPerBlock() );
                                                break;
                                        case ROW_BLOCK_WISE_N:
                                                mcNew = new 
MatrixCharacteristics( mo.getPartitionSize(), mc.getCols(), 
mc.getRowsPerBlock(), mc.getColsPerBlock() );
-                                               break;  
+                                               break;
                                        case COLUMN_WISE:
                                                mcNew = new 
MatrixCharacteristics( mc.getRows(), 1, mc.getRowsPerBlock(), 
mc.getColsPerBlock() );
                                                break;
                                        case COLUMN_BLOCK_WISE_N:
                                                mcNew = new 
MatrixCharacteristics( mc.getRows(), mo.getPartitionSize(), 
mc.getRowsPerBlock(), mc.getColsPerBlock() );
-                                               break;  
+                                               break;
                                        default:
                                                throw new 
DMLRuntimeException("Unsupported partition format for CP_FILE 
"+RightIndex.OPCODE+": "+ mo.getPartitionFormat());
                                }
                                
                                MetaDataFormat metaNew = new 
MetaDataFormat(mcNew,meta.getOutputInfo(),meta.getInputInfo());
-                               mobj.setMetaData(metaNew);       
+                               mobj.setMetaData(metaNew);
+                               
+                               //note: disable cleanup to ensure that the 
partitioning file is not deleted 
+                               //(e.g., for nested loops or reused partitioned 
matrices across loops)
+                               mobj.enableCleanup(false);
                                
                                //put output object into symbol table
                                ec.setVariable(output.getName(), mobj);
                        }
-                       else
-                       {
-                               //will return an empty matrix partition 
+                       else { //empty matrix partition
+                               //note: for binary cell data partitioning empty 
partitions are not materialized
                                MatrixBlock resultBlock = 
mo.readMatrixPartition( ixrange );
                                ec.setMatrixOutput(output.getName(), 
resultBlock, getExtendedOpcode());
                        }
                }
-               else
-               {
-                       throw new DMLRuntimeException("Invalid opcode or index 
predicate for MatrixIndexingCPFileInstruction: " + instString);  
+               else {
+                       throw new DMLRuntimeException("Invalid opcode or index 
predicate "
+                               + "for MatrixIndexingCPFileInstruction: " + 
instString);
                }
        }
-}
\ No newline at end of file
+}

Reply via email to