Repository: systemml
Updated Branches:
  refs/heads/master 53b489ced -> 60723c5b6


[SYSTEMML-2213] Removed unused file-based removeEmpty operations

This patch removes the unused CP_FILE removeEmpty operation (i.e., the
file-based out-of-core operation executed at the driver node), which was
superseded by distributed spark and mr removeEmpty operations (which are
in turn based on distributed cumsum operations) many years ago. 


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/60723c5b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/60723c5b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/60723c5b

Branch: refs/heads/master
Commit: 60723c5b69f4edc0908aae37b9889c6a9994ddc9
Parents: 53b489c
Author: Matthias Boehm <[email protected]>
Authored: Sun Mar 25 23:18:59 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sun Mar 25 23:18:59 2018 -0700

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |   6 +-
 .../hops/cost/CostEstimatorStaticRuntime.java   |   5 +-
 .../parfor/opt/OptTreeConverter.java            |   3 +-
 .../instructions/CPInstructionParser.java       |  11 +-
 .../ParameterizedBuiltinCPFileInstruction.java  | 998 -------------------
 5 files changed, 7 insertions(+), 1016 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/60723c5b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 35ed54b..1010db1 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -51,8 +51,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  * 
  */
 public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
-{      
-       private static boolean COMPILE_PARALLEL_REMOVEEMPTY = true;
+{
        public static boolean FORCE_DIST_RM_EMPTY = false;
 
        //operator type
@@ -175,7 +174,6 @@ public class ParameterizedBuiltinOp extends Hop implements 
MultiThreadedHop
                        }
                        case RMEMPTY: {
                                ExecType et = optFindExecType();
-                               et = (et == ExecType.MR && 
!COMPILE_PARALLEL_REMOVEEMPTY ) ? ExecType.CP_FILE : et;
                                constructLopsRemoveEmpty(inputlops, et);
                                break;
                        } 
@@ -406,7 +404,7 @@ public class ParameterizedBuiltinOp extends Hop implements 
MultiThreadedHop
                Hop selectHop = getParameterHop("select");
                Hop emptyRet = getParameterHop("empty.return");
                
-               if( et == ExecType.CP || et == ExecType.CP_FILE )
+               if( et == ExecType.CP )
                {
                        ParameterizedBuiltin pbilop = new 
ParameterizedBuiltin(inputlops,HopsParameterizedBuiltinLops.get(_op), 
getDataType(), getValueType(), et);
                        setOutputDimensions(pbilop);

http://git-wip-us.apache.org/repos/asf/systemml/blob/60723c5b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java 
b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
index 675fdf9..e9578d1 100644
--- a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
+++ b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
@@ -1198,7 +1198,7 @@ public class CostEstimatorStaticRuntime extends 
CostEstimator
        
                                case ZeroOut: //opcodes: zeroOut
                                        return   DEFAULT_NFLOP_CP * 
((leftSparse)? d1m*d1n*d1s : d1m*d1n)
-                                          + DEFAULT_NFLOP_CP * ((rightSparse)? 
d2m*d2n*d2s : d2m*d2n );                                                        
        
+                                          + DEFAULT_NFLOP_CP * ((rightSparse)? 
d2m*d2n*d2s : d2m*d2n );
                                        
                                default:
                                        return 0;
@@ -1209,9 +1209,6 @@ public class CostEstimatorStaticRuntime extends 
CostEstimator
                        throw new DMLRuntimeException("CostEstimator: 
unsupported instruction type: "+optype);
                }
                
-               //TODO Parameterized Builtin Functions
-               //String2CPFileInstructionType.put( "rmempty"       , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
-               
                return -1; //should never come here.
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/60723c5b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
index e88df23..4041fb5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
@@ -62,7 +62,6 @@ import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
 import 
org.apache.sysml.runtime.instructions.cpfile.MatrixIndexingCPFileInstruction;
-import 
org.apache.sysml.runtime.instructions.cpfile.ParameterizedBuiltinCPFileInstruction;
 import org.apache.sysml.runtime.instructions.spark.SPInstruction;
 
 /**
@@ -627,7 +626,7 @@ public class OptTreeConverter
        public static boolean containsMRJobInstruction( ArrayList<Instruction> 
instSet, boolean inclCPFile, boolean inclSpark ) {
                return instSet.stream().anyMatch(inst -> inst instanceof 
MRJobInstruction
                        || (inclSpark && inst instanceof SPInstruction)
-                       || (inclCPFile && (inst instanceof 
MatrixIndexingCPFileInstruction || inst instanceof 
ParameterizedBuiltinCPFileInstruction)));
+                       || (inclCPFile && inst instanceof 
MatrixIndexingCPFileInstruction));
        }
 
        public static boolean containsFunctionCallInstruction( ProgramBlock pb 
) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/60723c5b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 24424ae..6b875d2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -64,7 +64,6 @@ import 
org.apache.sysml.runtime.instructions.cp.UaggOuterChainCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.UnaryCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
 import 
org.apache.sysml.runtime.instructions.cpfile.MatrixIndexingCPFileInstruction;
-import 
org.apache.sysml.runtime.instructions.cpfile.ParameterizedBuiltinCPFileInstruction;
 
 public class CPInstructionParser extends InstructionParser 
 {
@@ -364,14 +363,10 @@ public class CPInstructionParser extends InstructionParser
                                
                        case External:
                                return 
FunctionCallCPInstruction.parseInstruction(str);
-                               
+                       
                        case ParameterizedBuiltin: 
-                               execType = ExecType.valueOf( 
str.split(Instruction.OPERAND_DELIM)[0] ); 
-                               if( execType == ExecType.CP )
-                                       return 
ParameterizedBuiltinCPInstruction.parseInstruction(str);
-                               else //exectype CP_FILE
-                                       return 
ParameterizedBuiltinCPFileInstruction.parseInstruction(str);
-       
+                               return 
ParameterizedBuiltinCPInstruction.parseInstruction(str);
+                       
                        case MultiReturnParameterizedBuiltin:
                                return 
MultiReturnParameterizedBuiltinCPInstruction.parseInstruction(str);
                                

http://git-wip-us.apache.org/repos/asf/systemml/blob/60723c5b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
deleted file mode 100644
index f9596b0..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
+++ /dev/null
@@ -1,998 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.cpfile;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
-import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
-import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import 
org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixWriter;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.MetaDataFormat;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixCell;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
-import org.apache.sysml.runtime.util.FastStringTokenizer;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.runtime.util.MapReduceTool;
-
-/**
- * File-based (out-of-core) realization of remove empty for robustness because 
there is no
- * parallel version due to data-dependent row- and column dependencies.
- * 
- */
-public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinCPInstruction {
-
-       private ParameterizedBuiltinCPFileInstruction(Operator op, 
HashMap<String, String> paramsMap, CPOperand out,
-                       String opcode, String istr) {
-               super(op, paramsMap, out, opcode, istr);
-       }
-
-       public static ParameterizedBuiltinCPFileInstruction parseInstruction( 
String str ) {
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               // first part is always the opcode
-               String opcode = parts[0];
-               // last part is always the output
-               CPOperand out = new CPOperand( parts[parts.length-1] ); 
-
-               // process remaining parts and build a hash map
-               HashMap<String,String> paramsMap = constructParameterMap(parts);
-
-               // determine the appropriate value function
-               ValueFunction func = null;
-               if ( opcode.equalsIgnoreCase("rmempty") ) {
-                       func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-                       return new ParameterizedBuiltinCPFileInstruction(new 
SimpleOperator(func), paramsMap, out, opcode, str);
-               }
-               else {
-                       throw new DMLRuntimeException("Unknown opcode (" + 
opcode + ") for ParameterizedBuiltin Instruction.");
-               }
-       }
-       
-       @Override 
-       public void processInstruction(ExecutionContext ec) {
-               String opcode = getOpcode();
-               
-               if ( opcode.equalsIgnoreCase("rmempty") ) 
-               {
-                       // get inputs
-                       MatrixObject src = ec.getMatrixObject( 
params.get("target") );
-                       MatrixObject out = ec.getMatrixObject( output.getName() 
);
-                       String margin = params.get("margin");
-                       
-                       // export input matrix (if necessary)
-                       src.exportData();
-                       
-                       //core execution
-                       RemoveEmpty rm = new RemoveEmpty( margin, src, out );
-                       out = rm.execute();
-               
-                       //put output
-                       ec.setVariable(output.getName(), out);
-               }
-               else {
-                       throw new DMLRuntimeException("Unknown opcode : " + 
opcode);
-               }
-       }
-
-       /**
-        * Remove empty rows as a inner class in order to allow testing 
independent of the
-        * overall SystemML instruction framework.
-        * 
-        */
-       public static class RemoveEmpty
-       {
-               private String _margin = null;
-               private MatrixObject _src = null;
-               private MatrixObject _out = null;
-               
-               public RemoveEmpty( String margin, MatrixObject src, 
MatrixObject out )
-               {
-                       _margin = margin;
-                       _src = src;
-                       _out = out;
-               }
-
-               public MatrixObject execute() {
-                       //initial setup
-                       String fnameOld = _src.getFileName();
-                       String fnameNew = _out.getFileName();
-                       InputInfo ii = 
((MetaDataFormat)_src.getMetaData()).getInputInfo();
-                       MatrixCharacteristics mc = 
_src.getMatrixCharacteristics();
-                       
-                       String stagingDir = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_WORK);
-                       LocalFileUtils.createLocalFileIfNotExist(stagingDir);
-                       
-                       long ret = -1;
-                       try
-                       {
-                               boolean diagBlocks = false;
-                               
-                               //Phase 1: write file to staging 
-                               if( ii == InputInfo.TextCellInputInfo )
-                                       createTextCellStagingFile( fnameOld, 
stagingDir );
-                               else if( ii == InputInfo.BinaryCellInputInfo )
-                                       createBinaryCellStagingFile( fnameOld, 
stagingDir );
-                               else if( ii == InputInfo.BinaryBlockInputInfo )
-                                       diagBlocks = 
createBinaryBlockStagingFile( fnameOld, stagingDir );
-                               
-                               //Phase 2: scan empty rows/cols
-                               if( diagBlocks )
-                                       ret = createKeyMappingDiag(stagingDir, 
mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), ii);
-                               else
-                                       ret = createKeyMapping(stagingDir, 
mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), ii);
-                               
-                               //Phase 3: create output files
-                               MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-                               if(   ii == InputInfo.TextCellInputInfo 
-                                  || ii == InputInfo.BinaryCellInputInfo )
-                               {
-                                       createCellResultFile( fnameNew, 
stagingDir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), 
mc.getColsPerBlock(), ii );
-                               }
-                               else if( ii == InputInfo.BinaryBlockInputInfo )
-                               {
-                                       if( diagBlocks )
-                                               createBlockResultFileDiag( 
fnameNew, stagingDir, mc.getRows(), mc.getCols(), ret, mc.getNonZeros(), 
mc.getRowsPerBlock(), mc.getColsPerBlock(), ii );
-                                       else
-                                               createBlockResultFile( 
fnameNew, stagingDir, mc.getRows(), mc.getCols(), ret, mc.getNonZeros(), 
mc.getRowsPerBlock(), mc.getColsPerBlock(), ii );
-                               }
-                       }
-                       catch( IOException ioe ) {
-                               throw new DMLRuntimeException( ioe );
-                       }
-                       
-                       //final cleanup
-                       LocalFileUtils.cleanupWorkingDirectory(stagingDir);
-                       
-                       //create and return new output object
-                       if( _margin.equals("rows") )
-                               return createNewOutputObject(_src, _out, ret, 
mc.getCols());
-                       else
-                               return createNewOutputObject(_src, _out, 
mc.getRows(), ret );
-               }
-
-               private static MatrixObject createNewOutputObject( MatrixObject 
src, MatrixObject out, long rows, long cols ) {
-                       String fName = out.getFileName();
-                       MetaDataFormat metadata = (MetaDataFormat) 
src.getMetaData();
-                       MatrixObject moNew = new 
MatrixObject(src.getValueType(), fName);
-                       
-                       //handle empty output block (ensure valid dimensions)
-                       if( rows==0 || cols ==0 ){
-                               rows = Math.max(rows, 1);
-                               cols = Math.max(cols, 1);
-                               moNew.acquireModify(new MatrixBlock((int)rows, 
(int) cols, true));
-                               moNew.release();
-                       }
-                       
-                       //create deep copy of metadata obj
-                       MatrixCharacteristics mcOld = 
metadata.getMatrixCharacteristics();
-                       OutputInfo oiOld = metadata.getOutputInfo();
-                       InputInfo iiOld = metadata.getInputInfo();
-                       MatrixCharacteristics mc = new MatrixCharacteristics( 
rows, cols,
-                               mcOld.getRowsPerBlock(), 
mcOld.getColsPerBlock(), mcOld.getNonZeros());
-                       MetaDataFormat meta = new 
MetaDataFormat(mc,oiOld,iiOld);
-                       moNew.setMetaData( meta );
-
-                       return moNew;
-               }
-
-               public void createTextCellStagingFile( String fnameOld, String 
stagingDir ) 
-                       throws IOException, DMLRuntimeException
-               {       
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameOld);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       if( !fs.exists(path) )  
-                               throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
-                       FileInputFormat.addInputPath(job, path); 
-                       TextInputFormat informat = new TextInputFormat();
-                       informat.configure(job);
-                       InputSplit[] splits = informat.getSplits(job, 1);
-               
-                       LinkedList<Cell> buffer = new LinkedList<>();
-                       
-                       LongWritable key = new LongWritable();
-                       Text value = new Text();
-                       FastStringTokenizer st = new FastStringTokenizer(' ');  
        
-                       
-                       for(InputSplit split: splits)
-                       {
-                               RecordReader<LongWritable,Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);                           
-                               try
-                               {
-                                       while( reader.next(key, value) )
-                                       {
-                                               st.reset( value.toString() ); 
//reset tokenizer
-                                               long row = st.nextLong();
-                                               long col = st.nextLong();
-                                               double lvalue = st.nextDouble();
-                                               
-                                               buffer.add(new 
Cell(row,col,lvalue));
-                                               
-                                               if( buffer.size() > 
StagingFileUtils.CELL_BUFFER_SIZE )
-                                               {
-                                                       
appendCellBufferToStagingArea(stagingDir, buffer, 
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize());
-                                                       buffer.clear();
-                                               }
-                                       }
-                                       
-                                       if( !buffer.isEmpty() )
-                                       {
-                                               
appendCellBufferToStagingArea(stagingDir, buffer, 
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize());
-                                               buffer.clear();
-                                       }
-                               }
-                               finally {
-                                       IOUtilFunctions.closeSilently(reader);
-                               }
-                       }
-               }               
-
-               @SuppressWarnings("deprecation")
-               public void createBinaryCellStagingFile( String fnameOld, 
String stagingDir ) 
-                       throws IOException, DMLRuntimeException
-               {
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameOld);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       if( !fs.exists(path) )  
-                               throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
-                       
-                       LinkedList<Cell> buffer = new LinkedList<>();
-                       
-                       MatrixIndexes key = new MatrixIndexes();
-                       MatrixCell value = new MatrixCell();
-
-                       for(Path lpath: 
IOUtilFunctions.getSequenceFilePaths(fs, path))
-                       {
-                               SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
-                               try
-                               {
-                                       while(reader.next(key, value))
-                                       {
-                                               long row = key.getRowIndex();
-                                               long col = key.getColumnIndex();
-                                               double lvalue = 
value.getValue();
-                                               
-                                               buffer.add(new 
Cell(row,col,lvalue));
-                                               
-                                               if( buffer.size() > 
StagingFileUtils.CELL_BUFFER_SIZE )
-                                               {
-                                                       
appendCellBufferToStagingArea(stagingDir, buffer, 
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize());
-                                                       buffer.clear();
-                                               }
-                                       }
-                                       
-                                       if( !buffer.isEmpty() )
-                                       {
-                                               
appendCellBufferToStagingArea(stagingDir, buffer, 
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize());
-                                               buffer.clear();
-                                       }
-                               }
-                               finally {
-                                       IOUtilFunctions.closeSilently(reader);
-                               }
-                       }
-               }
-
-               /**
-                * Creates a binary block staging file and returns if the input 
matrix is a diag,
-                * because diag is the primary usecase and there is lots of 
optimization potential.
-                * 
-                * @param fnameOld old filename
-                * @param stagingDir staging directory
-                * @return true if diag
-                * @throws IOException if IOException occurs
-                */
-               @SuppressWarnings("deprecation")
-               public boolean createBinaryBlockStagingFile( String fnameOld, 
String stagingDir ) 
-                       throws IOException
-               {
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameOld);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       if( !fs.exists(path) )  
-                               throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
-                       
-                       MatrixIndexes key = new MatrixIndexes(); 
-                       MatrixBlock value = new MatrixBlock();
-                       boolean diagBlocks = true;
-                       
-                       for(Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path))
-                       {
-                               SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
-                               
-                               try
-                               {
-                                       while( reader.next(key, value) )
-                                       {
-                                               if( !value.isEmptyBlock() ) 
//skip empty blocks (important for diag)
-                                               {
-                                                       String fname = 
stagingDir +"/"+key.getRowIndex()+"_"+key.getColumnIndex();
-                                                       
LocalFileUtils.writeMatrixBlockToLocal(fname, value);
-                                                       diagBlocks &= 
(key.getRowIndex()==key.getColumnIndex());
-                                               }
-                                       }       
-                               }
-                               finally {
-                                       IOUtilFunctions.closeSilently(reader);
-                               }
-                       }
-                       
-                       return diagBlocks;
-               }
-
-               private static void appendCellBufferToStagingArea( String dir, 
LinkedList<Cell> buffer, int brlen, int bclen ) throws IOException {
-                       HashMap<String,LinkedList<Cell>> sortedBuffer = new 
HashMap<>();
-                       //sort cells in buffer wrt key
-                       String key = null;
-                       for( Cell c : buffer ) {
-                               key = (c.getRow()/brlen+1) 
+"_"+(c.getCol()/bclen+1);
-                               if( !sortedBuffer.containsKey(key) )
-                                       sortedBuffer.put(key, new 
LinkedList<Cell>());
-                               sortedBuffer.get(key).addLast(c);
-                       }
-                       //write lists of cells to local files
-                       for( Entry<String,LinkedList<Cell>> e : 
sortedBuffer.entrySet() ) {
-                               String pfname = dir + "/" + e.getKey();
-                               StagingFileUtils.writeCellListToLocal(pfname, 
e.getValue());
-                       }
-               }
-
-               private long createKeyMapping( String stagingDir, long rlen, 
long clen, int brlen, int bclen, InputInfo ii) 
-                       throws FileNotFoundException, IOException 
-               {
-                       String metaOut = stagingDir+"/meta";
-                       
-                       long len = 0;
-                       long lastKey = 0;
-                       
-                       if(_margin.equals("rows"))
-                       {
-                               for(int blockRow = 0; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                               {       
-                                       boolean[] flags = new boolean[brlen];
-                                       for( int k=0; k<brlen; k++ )
-                                               flags[k] = true;
-                                       
-                                       //scan for empty rows
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                                       {
-                                               String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                               if( ii == 
InputInfo.BinaryBlockInputInfo ){
-                                                       if( 
!LocalFileUtils.isExisting(fname) )
-                                                               continue;
-                                                       MatrixBlock buffer = 
LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                       for( int i=0; 
i<buffer.getNumRows(); i++ )
-                                                               for( int j=0; 
j<buffer.getNumColumns(); j++ )
-                                                               {
-                                                                       double 
lvalue = buffer.quickGetValue(i, j);
-                                                                       if( 
lvalue != 0 )
-                                                                               
flags[ i ] = false;
-                                                               }
-                                               }
-                                               else{
-                                                       LinkedList<Cell> buffer 
= StagingFileUtils.readCellListFromLocal(fname);
-                                                       for( Cell c : buffer )
-                                                               flags[ 
(int)c.getRow()-blockRow*brlen-1 ] = false;
-                                               }
-                                       } 
-                       
-                                       //create and append key mapping
-                                       LinkedList<long[]> keyMapping = new 
LinkedList<>();
-                                       for( int i = 0; i<flags.length; i++ )
-                                               if( !flags[i] )
-                                                       keyMapping.add(new 
long[]{blockRow*brlen+i, lastKey++});
-                                       len += keyMapping.size();
-                                       
StagingFileUtils.writeKeyMappingToLocal(metaOut, keyMapping.toArray(new 
long[0][0]));
-                               }
-                       }
-                       else
-                       {
-                               for(int blockCol = 0; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                               {       
-                                       boolean[] flags = new boolean[bclen];
-                                       for( int k=0; k<bclen; k++ )
-                                               flags[k] = true;
-                                       
-                                       //scan for empty rows
-                                       for(int blockRow = 0; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                                       {
-                                               String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                               if( ii == 
InputInfo.BinaryBlockInputInfo ){
-                                                       if( 
!LocalFileUtils.isExisting(fname) )
-                                                               continue;
-                                                       MatrixBlock buffer = 
LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                       for( int i=0; 
i<buffer.getNumRows(); i++ )
-                                                               for( int j=0; 
j<buffer.getNumColumns(); j++ )
-                                                               {
-                                                                       double 
lvalue = buffer.quickGetValue(i, j);
-                                                                       if( 
lvalue != 0 )
-                                                                               
flags[ j ] = false;
-                                                               }
-                                               }
-                                               else{
-                                                       LinkedList<Cell> buffer 
= StagingFileUtils.readCellListFromLocal(fname);
-                                                       for( Cell c : buffer )
-                                                               flags[ 
(int)c.getCol()-blockCol*bclen-1 ] = false;
-                                               }
-                                       } 
-                       
-                                       //create and append key mapping
-                                       LinkedList<long[]> keyMapping = new 
LinkedList<>();
-                                       for( int i = 0; i<flags.length; i++ )
-                                               if( !flags[i] )
-                                                       keyMapping.add(new 
long[]{blockCol*bclen+i, lastKey++});
-                                       len += keyMapping.size();
-                                       
StagingFileUtils.writeKeyMappingToLocal(metaOut, keyMapping.toArray(new 
long[0][0]));
-                               }
-                       }
-                       
-                       //final validation (matrices with dimensions 0x0 not 
allowed)
-                       if( len <= 0 )
-                               throw new DMLRuntimeException("Matrices with 
dimensions [0,0] not supported.");
-                       
-                       return len;
-               }
-
-               private long createKeyMappingDiag( String stagingDir, long 
rlen, long clen, int brlen, int bclen, InputInfo ii) 
-                       throws FileNotFoundException, IOException, 
DMLRuntimeException 
-               {
-                       String metaOut = stagingDir+"/meta";
-                       
-                       long len = 0;
-                       long lastKey = 0;
-                       
-                       if(_margin.equals("rows"))
-                       {
-                               for(int blockRow = 0; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                               {       
-                                       boolean[] flags = new boolean[brlen];
-                                       for( int k=0; k<brlen; k++ )
-                                               flags[k] = true;
-                                       
-                                       //scan for empty rows
-                                       String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockRow+1);
-                                       if( ii == 
InputInfo.BinaryBlockInputInfo ){
-                                               if( 
!LocalFileUtils.isExisting(fname) )
-                                                       continue;
-                                               MatrixBlock buffer = 
LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                               for( int i=0; 
i<buffer.getNumRows(); i++ )
-                                                       for( int j=0; 
j<buffer.getNumColumns(); j++ )
-                                                       {
-                                                               double lvalue = 
buffer.quickGetValue(i, j);
-                                                               if( lvalue != 0 
)
-                                                                       flags[ 
i ] = false;
-                                                       }
-                                       }
-                                       else{
-                                               LinkedList<Cell> buffer = 
StagingFileUtils.readCellListFromLocal(fname);
-                                               for( Cell c : buffer )
-                                                       flags[ 
(int)c.getRow()-blockRow*brlen-1 ] = false;
-                                       }
-                                        
-                       
-                                       //create and append key mapping
-                                       LinkedList<long[]> keyMapping = new 
LinkedList<>();
-                                       for( int i = 0; i<flags.length; i++ )
-                                               if( !flags[i] )
-                                                       keyMapping.add(new 
long[]{blockRow*brlen+i, lastKey++});
-                                       len += keyMapping.size();
-                                       
StagingFileUtils.writeKeyMappingToLocal(metaOut, keyMapping.toArray(new 
long[0][0]));
-                               }
-                       }
-                       else
-                       {
-                               for(int blockCol = 0; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                               {       
-                                       boolean[] flags = new boolean[bclen];
-                                       for( int k=0; k<bclen; k++ )
-                                               flags[k] = true;
-                                       
-                                       //scan for empty rows
-                                       String fname = 
stagingDir+"/"+(blockCol+1)+"_"+(blockCol+1);
-                                       if( ii == 
InputInfo.BinaryBlockInputInfo ){
-                                               if( 
!LocalFileUtils.isExisting(fname) )
-                                                       continue;
-                                               MatrixBlock buffer = 
LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                               for( int i=0; 
i<buffer.getNumRows(); i++ )
-                                                       for( int j=0; 
j<buffer.getNumColumns(); j++ )
-                                                       {
-                                                               double lvalue = 
buffer.quickGetValue(i, j);
-                                                               if( lvalue != 0 
)
-                                                                       flags[ 
j ] = false;
-                                                       }
-                                       }
-                                       else{
-                                               LinkedList<Cell> buffer = 
StagingFileUtils.readCellListFromLocal(fname);
-                                               for( Cell c : buffer )
-                                                       flags[ 
(int)c.getCol()-blockCol*bclen-1 ] = false;
-                                       }
-                                        
-                       
-                                       //create and append key mapping
-                                       LinkedList<long[]> keyMapping = new 
LinkedList<>();
-                                       for( int i = 0; i<flags.length; i++ )
-                                               if( !flags[i] )
-                                                       keyMapping.add(new 
long[]{blockCol*bclen+i, lastKey++});
-                                       len += keyMapping.size();
-                                       
StagingFileUtils.writeKeyMappingToLocal(metaOut, keyMapping.toArray(new 
long[0][0]));
-                               }
-                       }
-                       
-                       //final validation (matrices with dimensions 0x0 not 
allowed)
-                       if( len <= 0 )
-                               throw new DMLRuntimeException("Matrices with 
dimensions [0,0] not supported.");
-                       
-                       return len;
-               }
-
-               @SuppressWarnings("deprecation")
-               public void createCellResultFile( String fnameNew, String 
stagingDir, long rlen, long clen, int brlen, int bclen, InputInfo ii ) 
-                       throws IOException, DMLRuntimeException
-               {
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameNew);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       String metaOut = stagingDir+"/meta";
-
-                       //prepare output
-                       BufferedWriter twriter = null;                  
-                       SequenceFile.Writer bwriter = null; 
-                       if( ii == InputInfo.TextCellInputInfo )
-                               twriter = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));     
-                       else if( ii == InputInfo.BinaryCellInputInfo )
-                               bwriter = new SequenceFile.Writer(fs, job, 
path, MatrixIndexes.class, MatrixCell.class);
-                       else
-                               throw new DMLRuntimeException("Unsupported cell 
input info: "+InputInfo.inputInfoToString(ii));
-                       
-                       StringBuilder sb = new StringBuilder();
-                       MatrixIndexes key = new MatrixIndexes();
-                       MatrixCell value = new MatrixCell();
-
-                       HashMap<Integer,HashMap<Long,Long>> keyMap = new 
HashMap<>();
-                       BufferedReader fkeyMap = 
StagingFileUtils.openKeyMap(metaOut);
-                       try
-                       {
-                               if( _margin.equals("rows") )
-                               {
-                                       for(int blockRow = 0; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                                       {
-                                               
StagingFileUtils.nextKeyMap(fkeyMap, keyMap, blockRow, brlen);          
-                                               for(int blockCol = 0; blockCol 
< (int)Math.ceil(clen/(double)bclen); blockCol++)
-                                               {
-                                                       String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                       LinkedList<Cell> buffer 
= StagingFileUtils.readCellListFromLocal(fname);
-                                                       if( ii == 
InputInfo.TextCellInputInfo )
-                                                               for( Cell c : 
buffer )
-                                                               {
-                                                                       
sb.append(keyMap.get(blockRow).get(c.getRow()-1)+1);
-                                                                       
sb.append(' ');
-                                                                       
sb.append(c.getCol());
-                                                                       
sb.append(' ');
-                                                                       
sb.append(c.getValue());
-                                                                       
sb.append('\n');
-                                                                       
twriter.write( sb.toString() ); 
-                                                                       
sb.setLength(0);
-                                                               }
-                                                       else if( ii == 
InputInfo.BinaryCellInputInfo )
-                                                               for( Cell c : 
buffer )
-                                                               {
-                                                                       
key.setIndexes(keyMap.get(blockRow).get(c.getRow()-1)+1, c.getCol());
-                                                                       
value.setValue(c.getValue());
-                                                                       
bwriter.append(key, value);     
-                                                               }
-                                               }
-                                               keyMap.remove(blockRow);
-                                       }
-                               }
-                               else
-                               {
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                                       {
-                                               
StagingFileUtils.nextKeyMap(fkeyMap, keyMap, blockCol, bclen);          
-                                               for(int blockRow = 0; blockRow 
< (int)Math.ceil(rlen/(double)brlen); blockRow++)
-                                               {
-                                                       String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                       LinkedList<Cell> buffer 
= StagingFileUtils.readCellListFromLocal(fname);
-                                                       if( ii == 
InputInfo.TextCellInputInfo )
-                                                               for( Cell c : 
buffer )
-                                                               {
-                                                                       
sb.append(c.getRow());
-                                                                       
sb.append(' ');
-                                                                       
sb.append(keyMap.get(blockCol).get(c.getCol()-1)+1);
-                                                                       
sb.append(' ');
-                                                                       
sb.append(c.getValue());
-                                                                       
sb.append('\n');
-                                                                       
twriter.write( sb.toString() ); 
-                                                                       
sb.setLength(0);
-                                                               }
-                                                       else if( ii == 
InputInfo.BinaryCellInputInfo )
-                                                               for( Cell c : 
buffer )
-                                                               {
-                                                                       
key.setIndexes(c.getRow(), keyMap.get(blockCol).get(c.getCol()-1)+1);
-                                                                       
value.setValue(c.getValue());
-                                                                       
bwriter.append(key, value);     
-                                                               }
-                                               }
-                                               keyMap.remove(blockCol);
-                                       }
-                               }
-
-                               //Note: no need to handle empty result
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(fkeyMap);
-                               IOUtilFunctions.closeSilently(twriter);
-                               IOUtilFunctions.closeSilently(bwriter);
-                       }
-               }
-
-               @SuppressWarnings("deprecation")
-               public void createBlockResultFile( String fnameNew, String 
stagingDir, long rlen, long clen, long newlen, long nnz, int brlen, int bclen, 
InputInfo ii ) 
-                       throws IOException, DMLRuntimeException
-               {
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameNew);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       String metaOut = stagingDir+"/meta";
-       
-                       //prepare output
-                       SequenceFile.Writer writer = new 
SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
-                       
-                       MatrixIndexes key = new MatrixIndexes(); 
-                       
-                       try
-                       {
-                               if( _margin.equals("rows") ) 
-                               {
-                                       MatrixBlock[] blocks = 
MatrixWriter.createMatrixBlocksForReuse(newlen, clen, brlen, bclen, 
-                                                                            
MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz), nnz);  
-                                       
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                                       {
-                                               
HashMap<Integer,HashMap<Long,Long>> keyMap = new HashMap<>();
-                                               BufferedReader fkeyMap = 
StagingFileUtils.openKeyMap(metaOut);
-                                               int maxCol = 
(int)(((long)blockCol*bclen + bclen < clen) ? bclen : clen - 
(long)blockCol*bclen);
-                                               
-                                               int blockRowOut = 0;
-                                               int currentSize = -1;
-                                               while( (currentSize = 
StagingFileUtils.nextSizedKeyMap(fkeyMap, keyMap, brlen, brlen)) > 0  )
-                                               {
-                                                       int maxRow = 
currentSize;
-                                                       
-                                                       //get reuse matrix block
-                                                       MatrixBlock block = 
MatrixWriter.getMatrixBlockForReuse(blocks, maxRow, maxCol, brlen, bclen);
-                                                       block.reset(maxRow, 
maxCol);
-                                                       
-                                                       int rowPos = 0;
-                                                       int blockRow = 
Collections.min(keyMap.keySet());
-                                                       
-                                                       for( ; blockRow < 
(int)Math.ceil(rlen/(double)brlen) && rowPos<brlen ; blockRow++)
-                                                       {
-                                                               if( 
keyMap.containsKey(blockRow) )
-                                                               {
-                                                                       String 
fname = stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                                       
-                                                                       if( 
LocalFileUtils.isExisting(fname) ) 
-                                                                       {       
-                                                                               
MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                                               
-                                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockRow);
-                                                                               
long row_offset = (long)blockRow*brlen;
-                                                                               
for( int i=0; i<tmp.getNumRows(); i++ )
-                                                                               
        if( lkeyMap.containsKey(row_offset+i) ) {       
-                                                                               
                //copy row
-                                                                               
                for( int j=0; j<tmp.getNumColumns(); j++ ) {
-                                                                               
                        double lvalue = tmp.quickGetValue(i, j);
-                                                                               
                        if( lvalue != 0 )
-                                                                               
                                block.quickSetValue(rowPos, j, lvalue);
-                                                                               
                }
-                                                                               
                rowPos++;
-                                                                               
        }
-                                                                       }
-                                                                       else
-                                                                       {
-                                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockRow);
-                                                                               
rowPos+=lkeyMap.size();
-                                                                       }
-                                                               }               
                
-                                                               
keyMap.remove(blockRow);
-                                                       }
-                                                       
-                                                       
key.setIndexes(blockRowOut+1, blockCol+1);
-                                                       writer.append(key, 
block);
-                                                       blockRowOut++;
-                                               }
-                                               
-                                               
IOUtilFunctions.closeSilently(fkeyMap);
-                                       }
-                               }
-                               else
-                               {
-                                       MatrixBlock[] blocks = 
MatrixWriter.createMatrixBlocksForReuse(rlen, newlen, brlen, bclen, 
-                                                                           
MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz), nnz);  
-                                       
-                                       for(int blockRow = 0; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                                       {
-                                               
HashMap<Integer,HashMap<Long,Long>> keyMap = new HashMap<>();
-                                               BufferedReader fkeyMap = 
StagingFileUtils.openKeyMap(metaOut);
-                                               int maxRow = 
(int)(((long)blockRow*brlen + brlen < rlen) ? brlen : rlen - 
(long)blockRow*brlen);
-                                               
-                                               int blockColOut = 0;
-                                               int currentSize = -1;
-                                               while( (currentSize = 
StagingFileUtils.nextSizedKeyMap(fkeyMap, keyMap, bclen, bclen)) > 0  )
-                                               {
-                                                       int maxCol = 
currentSize;
-                                                       
-                                                       //get reuse matrix block
-                                                       MatrixBlock block = 
MatrixWriter.getMatrixBlockForReuse(blocks, maxRow, maxCol, brlen, bclen);
-                                                       block.reset(maxRow, 
maxCol);
-                                                       int colPos = 0;
-                                                       
-                                                       int blockCol = 
Collections.min(keyMap.keySet());
-                                                       for( ; blockCol < 
(int)Math.ceil(clen/(double)bclen) && colPos<bclen ; blockCol++)
-                                                       {
-                                                               if( 
keyMap.containsKey(blockCol) )
-                                                               {
-                                                                       String 
fname = stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                                       
-                                                                       if( 
LocalFileUtils.isExisting(fname) ) 
-                                                                       {
-                                                                               
MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                                               
-                                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockCol);
-                                                                               
long col_offset = blockCol*bclen;
-                                                                               
for( int j=0; j<tmp.getNumColumns(); j++ )
-                                                                               
        if( lkeyMap.containsKey(col_offset+j) ) {       
-                                                                               
                //copy column
-                                                                               
                for( int i=0; i<tmp.getNumRows(); i++ ){
-                                                                               
                        double lvalue = tmp.quickGetValue(i, j);
-                                                                               
                        if( lvalue != 0 )
-                                                                               
                                block.quickSetValue(i, colPos, lvalue);
-                                                                               
                }
-                                                                               
                colPos++;
-                                                                               
        }
-                                                                       }
-                                                                       else
-                                                                       {
-                                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockCol);
-                                                                               
colPos+=lkeyMap.size();
-                                                                       }
-                                                               }               
                                        
-                                                               
keyMap.remove(blockCol);
-                                                       }
-                                                       
-                                                       
key.setIndexes(blockRow+1, blockColOut+1);
-                                                       writer.append(key, 
block);
-                                                       blockColOut++;
-                                               }
-                                               
IOUtilFunctions.closeSilently(fkeyMap);
-                                       }
-                               }
-                               
-                               //Note: no handling of empty matrices necessary
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(writer);
-                       }
-               }
-
-               @SuppressWarnings("deprecation")
-               public void createBlockResultFileDiag( String fnameNew, String 
stagingDir, long rlen, long clen, long newlen, long nnz, int brlen, int bclen, 
InputInfo ii ) 
-                       throws IOException, DMLRuntimeException
-               {
-                       //prepare input
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-                       Path path = new Path(fnameNew);
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       String metaOut = stagingDir+"/meta";
-       
-                       //prepare output
-                       SequenceFile.Writer writer = new 
SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
-                       MatrixIndexes key = new MatrixIndexes(); 
-                       HashSet<Long> writtenBlocks = new HashSet<>();
-                       
-                       try
-                       {
-                               if( _margin.equals("rows") ) 
-                               {
-                                       MatrixBlock[] blocks = 
MatrixWriter.createMatrixBlocksForReuse(newlen, clen, brlen, bclen, 
-                                                                              
MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz), nnz);  
-                                       HashMap<Integer,HashMap<Long,Long>> 
keyMap = new HashMap<>();
-                                       BufferedReader fkeyMap = 
StagingFileUtils.openKeyMap(metaOut);
-                                       int currentSize = -1;
-                                       int blockRowOut = 0;
-                                       
-                                       while( (currentSize = 
StagingFileUtils.nextSizedKeyMap(fkeyMap, keyMap, brlen, brlen)) > 0  )
-                                       {
-                                               int rowPos = 0;
-                                               int blockRow = 
Collections.min(keyMap.keySet()); 
-                                               int maxRow = currentSize;
-                                               for( ; blockRow < 
(int)Math.ceil(rlen/(double)brlen); blockRow++)
-                                               {
-                                                       int blockCol = 
blockRow; // for diag known to be equivalent
-                                                       int maxCol = 
(int)(((long)blockCol*bclen + bclen < clen) ? bclen : clen - 
(long)blockCol*bclen);
-                                                       
-                                                       //get reuse matrix block
-                                                       MatrixBlock block = 
MatrixWriter.getMatrixBlockForReuse(blocks, maxRow, maxCol, brlen, bclen);
-                                                       block.reset(maxRow, 
maxCol);
-                                                       
-                                                       if( 
keyMap.containsKey(blockRow) )
-                                                       {
-                                                               String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                               MatrixBlock tmp 
= LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                               
-                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockRow);
-                                                               long row_offset 
= blockRow*brlen;
-                                                               for( int i=0; 
i<tmp.getNumRows(); i++ )
-                                                                       if( 
lkeyMap.containsKey(row_offset+i) ) {       
-                                                                               
//copy row
-                                                                               
for( int j=0; j<tmp.getNumColumns(); j++ ) {
-                                                                               
        double lvalue = tmp.quickGetValue(i, j);
-                                                                               
        if( lvalue != 0 )
-                                                                               
                block.quickSetValue(rowPos, j, lvalue);
-                                                                               
}
-                                                                               
rowPos++;
-                                                                       }
-                                                       }
-                                                       
-                                                       //output current block 
(by def of diagBlocks, no additional rows)
-                                                       
key.setIndexes(blockRowOut+1, blockCol+1);
-                                                       writer.append(key, 
block);
-                                                       
writtenBlocks.add(IDHandler.concatIntIDsToLong(blockRowOut+1, blockCol+1));
-                                                       
-                                                       //finished block
-                                                       if( rowPos == maxRow )
-                                                       {
-                                                               
keyMap.remove(blockRow);        
-                                                               blockRowOut++;
-                                                               break;
-                                                       }
-                                               }
-                                       }
-                                       IOUtilFunctions.closeSilently(fkeyMap);
-                               }
-                               else //cols
-                               {
-                                       MatrixBlock[] blocks = 
MatrixWriter.createMatrixBlocksForReuse(rlen, newlen, brlen, bclen, 
-                                                                            
MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz), nnz);  
-                                       HashMap<Integer,HashMap<Long,Long>> 
keyMap = new HashMap<>();
-                                       BufferedReader fkeyMap = 
StagingFileUtils.openKeyMap(metaOut);
-                                       int currentSize = -1;
-                                       int blockColOut = 0;
-                                       
-                                       while( (currentSize = 
StagingFileUtils.nextSizedKeyMap(fkeyMap, keyMap, bclen, bclen)) > 0  )
-                                       {
-                                               int colPos = 0;
-                                               int blockCol = 
Collections.min(keyMap.keySet()); 
-                                               int maxCol = currentSize;
-                                               for( ; blockCol < 
(int)Math.ceil(clen/(double)bclen); blockCol++)
-                                               {
-                                                       int blockRow = 
blockCol; // for diag known to be equivalent
-                                                       int maxRow = 
(int)((blockRow*brlen + brlen < rlen) ? brlen : rlen - blockRow*brlen);
-                                                       
-                                                       //get reuse matrix block
-                                                       MatrixBlock block = 
MatrixWriter.getMatrixBlockForReuse(blocks, maxRow, maxCol, brlen, bclen);
-                                                       block.reset(maxRow, 
maxCol);
-                                               
-                                                       if( 
keyMap.containsKey(blockCol) )
-                                                       {
-                                                               String fname = 
stagingDir+"/"+(blockRow+1)+"_"+(blockCol+1);
-                                                               MatrixBlock tmp 
= LocalFileUtils.readMatrixBlockFromLocal(fname);
-                                                               
-                                                               
HashMap<Long,Long> lkeyMap = keyMap.get(blockCol);
-                                                               long col_offset 
= blockCol*bclen;
-                                                               for( int j=0; 
j<tmp.getNumColumns(); j++ )
-                                                                       if( 
lkeyMap.containsKey(col_offset+j) ) {       
-                                                                               
//copy column
-                                                                               
for( int i=0; i<tmp.getNumRows(); i++ ){
-                                                                               
        double lvalue = tmp.quickGetValue(i, j);
-                                                                               
        if( lvalue != 0 )
-                                                                               
                block.quickSetValue(i, colPos, lvalue);
-                                                                               
}
-                                                                               
colPos++;
-                                                                       }
-                                                       }
-                                                               
-                                                       //output current block 
(by def of diagBlocks, no additional cols)
-                                                       
key.setIndexes(blockRow+1, blockColOut+1);
-                                                       writer.append(key, 
block);
-                                                       
writtenBlocks.add(IDHandler.concatIntIDsToLong(blockRow+1, blockColOut+1));
-                                                       
-                                                       //finished block
-                                                       if( colPos == maxCol )
-                                                       {
-                                                               
keyMap.remove(blockCol);        
-                                                               blockColOut++;
-                                                               break;
-                                                       }
-                                               }
-                                       }
-                                       IOUtilFunctions.closeSilently(fkeyMap);
-                               }
-                               
-                               //write remaining empty blocks
-                               MatrixBlock empty = new MatrixBlock(1,1,true);
-                               long rows = _margin.equals("rows") ? newlen : 
rlen;
-                               long cols = _margin.equals("cols") ? newlen : 
clen;
-                               int countBlk1 = 
(int)Math.ceil(rows/(double)brlen)*(int)Math.ceil(cols/(double)bclen);
-                               int countBlk2 = writtenBlocks.size();
-                               for( int i=0; 
i<(int)Math.ceil(rows/(double)brlen); i++)
-                                       for(int j=0; 
j<(int)Math.ceil(cols/(double)bclen); j++ )
-                                               if( 
!writtenBlocks.contains(IDHandler.concatIntIDsToLong(i+1, j+1)) )
-                                               {
-                                                       int maxRow = 
(int)((i*brlen + brlen < rows) ? brlen : rows - i*brlen);
-                                                       int maxCol = 
(int)((j*bclen + bclen < cols) ? bclen : cols - j*bclen);
-                                                       empty.reset(maxRow, 
maxCol);
-                                                       key.setIndexes(i+1, 
j+1);
-                                                       writer.append(key, 
empty);
-                                                       countBlk2++;
-                                               }
-                               
-                               if( countBlk1 != countBlk2 )
-                                       throw new DMLRuntimeException("Wrong 
number of written result blocks: "+countBlk1+" vs "+countBlk2+".");
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(writer);
-                       }
-               }
-       }
-}

Reply via email to