[SYSTEMML-2293] Fix correctness parfor data partitioning w/ cp_file rix This patch fixes result correctness for a special case of parfor data partitioning with nested loops or reuse of partition files across outer loops. Specifically, for large partitions, we use a CP_FILE instead of CP right indexing which simply creates a new meta data object around the specific partition file name and thus avoids unnecessary eviction for large partitions. However, this intermediate was cleaned up as any other intermediates leading to deleted partition files. In case of reuse of partitioned matrices this led to silently incorrect results because empty matrices are created for missing files (to support ultra sparse partitioned matrices where empty file are not materialized).
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/45a93396 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/45a93396 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/45a93396 Branch: refs/heads/master Commit: 45a93396aff1bbaad8f8cd94ad61fd01acef588b Parents: d74bda4 Author: Matthias Boehm <[email protected]> Authored: Mon Apr 30 19:44:40 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Apr 30 19:47:45 2018 -0700 ---------------------------------------------------------------------- .../cpfile/MatrixIndexingCPFileInstruction.java | 37 ++++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/45a93396/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java index 88a0624..4edd0a7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java @@ -67,12 +67,10 @@ public final class MatrixIndexingCPFileInstruction extends IndexingCPInstruction throw new DMLRuntimeException("Invalid number of operands in instruction: " + str); } } - else if ( parts[0].equalsIgnoreCase(LeftIndex.OPCODE)) - { - throw new DMLRuntimeException("Invalid opcode while parsing a MatrixIndexingCPFileInstruction: " + str); + else if ( parts[0].equalsIgnoreCase(LeftIndex.OPCODE)) { + throw new DMLRuntimeException("Invalid opcode while parsing a MatrixIndexingCPFileInstruction: " + str); } - else - { + else { throw new DMLRuntimeException("Unknown opcode while parsing a MatrixIndexingCPFileInstruction: " + str); } } @@ -89,45 +87,46 @@ public final class MatrixIndexingCPFileInstruction extends IndexingCPInstruction MatrixCharacteristics mc = meta.getMatrixCharacteristics(); String pfname = mo.getPartitionFileName( ixrange, mc.getRowsPerBlock(), mc.getColsPerBlock()); - if( MapReduceTool.existsFileOnHDFS(pfname) ) - { + if( MapReduceTool.existsFileOnHDFS(pfname) ) { //default //create output matrix object MatrixObject mobj = new MatrixObject(mo.getValueType(), pfname ); MatrixCharacteristics mcNew = null; - switch( mo.getPartitionFormat() ) - { + switch( mo.getPartitionFormat() ) { case ROW_WISE: mcNew = new MatrixCharacteristics( 1, mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); break; case ROW_BLOCK_WISE_N: mcNew = new MatrixCharacteristics( mo.getPartitionSize(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); - break; + break; case COLUMN_WISE: mcNew = new MatrixCharacteristics( mc.getRows(), 1, mc.getRowsPerBlock(), mc.getColsPerBlock() ); break; case COLUMN_BLOCK_WISE_N: mcNew = new MatrixCharacteristics( mc.getRows(), mo.getPartitionSize(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); - break; + break; default: throw new DMLRuntimeException("Unsupported partition format for CP_FILE "+RightIndex.OPCODE+": "+ mo.getPartitionFormat()); } MetaDataFormat metaNew = new MetaDataFormat(mcNew,meta.getOutputInfo(),meta.getInputInfo()); - mobj.setMetaData(metaNew); + mobj.setMetaData(metaNew); + + //note: disable cleanup to ensure that the partitioning file is not deleted + //(e.g., for nested loops or reused partitioned matrices across loops) + mobj.enableCleanup(false); //put output object into symbol table ec.setVariable(output.getName(), mobj); } - else - { - //will return an empty matrix partition + else { //empty matrix partition + //note: for binary cell data partitioning empty partitions are not materialized MatrixBlock resultBlock = mo.readMatrixPartition( ixrange ); ec.setMatrixOutput(output.getName(), resultBlock, getExtendedOpcode()); } } - else - { - throw new DMLRuntimeException("Invalid opcode or index predicate for MatrixIndexingCPFileInstruction: " + instString); + else { + throw new DMLRuntimeException("Invalid opcode or index predicate " + + "for MatrixIndexingCPFileInstruction: " + instString); } } -} \ No newline at end of file +}
