http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index fb268d4..f898f28 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -287,13 +287,11 @@ public class SparkExecutionContext extends ExecutionContext * * @param varname variable name * @return JavaPairRDD of MatrixIndexes-MatrixBlocks - * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) - throws DMLRuntimeException - { - return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); + public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) { + return (JavaPairRDD<MatrixIndexes,MatrixBlock>) + getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); } /** @@ -302,19 +300,15 @@ public class SparkExecutionContext extends ExecutionContext * * @param varname variable name * @return JavaPairRDD of Longs-FrameBlocks - * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) - throws DMLRuntimeException - { - JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); + public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) { + JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) + getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); return out; } - public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) - throws DMLRuntimeException - { + public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) { Data dat = getVariable(varname); if( dat instanceof MatrixObject ) { MatrixObject mo = getMatrixObject(varname); @@ -336,12 +330,9 @@ public class SparkExecutionContext extends ExecutionContext * @param mo matrix object * @param inputInfo input info * @return JavaPairRDD handle for a matrix object - * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) - throws DMLRuntimeException - { + public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) { //NOTE: MB this logic should be integrated into MatrixObject //However, for now we cannot assume that spark libraries are //always available and hence only store generic references in @@ -425,11 +416,9 @@ public class SparkExecutionContext extends ExecutionContext * @param fo frame object * @param inputInfo input info * @return JavaPairRDD handle for a frame object - * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo ) - throws DMLRuntimeException { //NOTE: MB this logic should be integrated into FrameObject //However, for now we cannot assume that spark libraries are @@ -511,7 +500,6 @@ public class SparkExecutionContext extends ExecutionContext @SuppressWarnings("unchecked") public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) - throws DMLRuntimeException { //NOTE: The memory consumption of this method is the in-memory size of the //matrix object plus the partitioned size in 1k-1k blocks. Since the call @@ -589,7 +577,6 @@ public class SparkExecutionContext extends ExecutionContext @SuppressWarnings("unchecked") public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname) - throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -663,11 +650,8 @@ public class SparkExecutionContext extends ExecutionContext * * @param varname variable name * @param rdd JavaPairRDD handle for variable - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) - throws DMLRuntimeException - { + public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) { CacheableData<?> obj = getCacheableData(varname); RDDObject rddhandle = new RDDObject(rdd); obj.setRDDHandle( rddhandle ); @@ -681,11 +665,8 @@ public class SparkExecutionContext extends ExecutionContext * @param brlen block row length * @param bclen block column length * @return JavaPairRDD handle to matrix block - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) - throws DMLRuntimeException - { + public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; List<Tuple2<MatrixIndexes,MatrixBlock>> list = null; @@ -731,9 +712,7 @@ public class SparkExecutionContext extends ExecutionContext } } - public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) - throws DMLRuntimeException - { + public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<>(); @@ -774,12 +753,9 @@ public class SparkExecutionContext extends ExecutionContext * @param bclen number of columns in a block * @param nnz number of non-zeros * @return matrix block - * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) - throws DMLRuntimeException - { + public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) { return toMatrixBlock( (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), rlen, clen, brlen, bclen, nnz); @@ -799,12 +775,8 @@ public class SparkExecutionContext extends ExecutionContext * @param bclen number of columns in a block * @param nnz number of non-zeros * @return matrix block - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) - throws DMLRuntimeException - { - + public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixBlock out = null; @@ -883,9 +855,7 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) - throws DMLRuntimeException - { + public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) { return toMatrixBlock( (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(), rlen, clen, nnz); @@ -900,10 +870,8 @@ public class SparkExecutionContext extends ExecutionContext * @param clen number of columns * @param nnz number of non-zeros * @return matrix block - * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) - throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -945,7 +913,6 @@ public class SparkExecutionContext extends ExecutionContext } public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) - throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -971,16 +938,12 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) - throws DMLRuntimeException - { + public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) { JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD(); return toFrameBlock(lrdd, schema, rlen, clen); } - public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) - throws DMLRuntimeException - { + public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; if(schema == null) @@ -1062,14 +1025,10 @@ public class SparkExecutionContext extends ExecutionContext * * @param varParent parent variable * @param varChild child variable - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void addLineageRDD(String varParent, String varChild) - throws DMLRuntimeException - { + public void addLineageRDD(String varParent, String varChild) { RDDObject parent = getCacheableData(varParent).getRDDHandle(); RDDObject child = getCacheableData(varChild).getRDDHandle(); - parent.addLineageChild( child ); } @@ -1078,20 +1037,14 @@ public class SparkExecutionContext extends ExecutionContext * * @param varParent parent variable * @param varChild child variable - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void addLineageBroadcast(String varParent, String varChild) - throws DMLRuntimeException - { + public void addLineageBroadcast(String varParent, String varChild) { RDDObject parent = getCacheableData(varParent).getRDDHandle(); BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle(); - parent.addLineageChild( child ); } - public void addLineage(String varParent, String varChild, boolean broadcast) - throws DMLRuntimeException - { + public void addLineage(String varParent, String varChild, boolean broadcast) { if( broadcast ) addLineageBroadcast(varParent, varChild); else @@ -1100,7 +1053,6 @@ public class SparkExecutionContext extends ExecutionContext @Override public void cleanupCacheableData( CacheableData<?> mo ) - throws DMLRuntimeException { //NOTE: this method overwrites the default behavior of cleanupMatrixObject //and hence is transparently used by rmvar instructions and other users. The @@ -1215,9 +1167,7 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public void repartitionAndCacheMatrixObject( String var ) - throws DMLRuntimeException - { + public void repartitionAndCacheMatrixObject( String var ) { MatrixObject mo = getMatrixObject(var); MatrixCharacteristics mcIn = mo.getMatrixCharacteristics(); @@ -1264,9 +1214,7 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public void cacheMatrixObject( String var ) - throws DMLRuntimeException - { + public void cacheMatrixObject( String var ) { //get input rdd and default storage level MatrixObject mo = getMatrixObject(var);
http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java index 9599993..3d12949 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java @@ -60,15 +60,11 @@ public abstract class DataPartitioner _n = n; } - public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew ) - throws DMLRuntimeException - { + public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew ) { return createPartitionedMatrixObject(in, fnameNew, false); } - public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew, boolean force ) - throws DMLRuntimeException - { + public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew, boolean force ) { MatrixObject out = new MatrixObject(in.getValueType(), fnameNew); return createPartitionedMatrixObject(in, out, force); } @@ -85,11 +81,8 @@ public abstract class DataPartitioner * @param out output matrix object * @param force if false, try to optimize * @return partitioned matrix object - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public MatrixObject createPartitionedMatrixObject( MatrixObject in, MatrixObject out, boolean force ) - throws DMLRuntimeException - { + public MatrixObject createPartitionedMatrixObject( MatrixObject in, MatrixObject out, boolean force ) { //check for naive partitioning if( _format == PDataPartitionFormat.NONE ) return in; @@ -172,8 +165,7 @@ public abstract class DataPartitioner _allowBinarycell = false; } - protected abstract void partitionMatrix( MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen ) - throws DMLRuntimeException; + protected abstract void partitionMatrix( MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen ); public static MatrixBlock createReuseMatrixBlock( PDataPartitionFormat dpf, int rows, int cols ) http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index cdb5d4d..ecb5ea0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -91,23 +91,17 @@ public class DataPartitionerLocal extends DataPartitioner * @param dpf data partitionformat * @param n ? * @param par -1 for serial otherwise number of threads, can be ignored by implementation - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public DataPartitionerLocal(PartitionFormat dpf, int par) - throws DMLRuntimeException - { + public DataPartitionerLocal(PartitionFormat dpf, int par) { super(dpf._dpf, dpf._N); - if( dpf.isBlockwise() ) throw new DMLRuntimeException("Data partitioning formt '"+dpf+"' not supported by DataPartitionerLocal" ); - _seq = new IDSequence(); _par = (par > 0) ? par : 1; } @Override protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) - throws DMLRuntimeException { //force writing to disk (typically not required since partitioning only applied if dataset exceeds CP size) in.exportData(); //written to disk iff dirty @@ -134,7 +128,6 @@ public class DataPartitionerLocal extends DataPartitioner } private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) - throws DMLRuntimeException { long row = -1; long col = -1; @@ -227,7 +220,6 @@ public class DataPartitionerLocal extends DataPartitioner @SuppressWarnings("deprecation") private void partitionBinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) - throws DMLRuntimeException { long row = -1; long col = -1; @@ -315,7 +307,6 @@ public class DataPartitionerLocal extends DataPartitioner @SuppressWarnings("deprecation") private void partitionBinaryBlock( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) - throws DMLRuntimeException { try { @@ -391,7 +382,6 @@ public class DataPartitionerLocal extends DataPartitioner @SuppressWarnings("deprecation") private void partitionBinaryBlock2BinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) - throws DMLRuntimeException { try { @@ -495,7 +485,7 @@ public class DataPartitionerLocal extends DataPartitioner } private void appendBlockToStagingArea( String dir, MatrixBlock mb, long row_offset, long col_offset, long brlen, long bclen ) - throws DMLRuntimeException, IOException + throws IOException { //NOTE: for temporary block we always create dense representations boolean sparse = mb.isInSparseFormat(); @@ -506,7 +496,7 @@ public class DataPartitionerLocal extends DataPartitioner if( _format == PDataPartitionFormat.ROW_WISE ) { - _reuseBlk.reset( 1, (int)cols, sparse, (int) (cols*sparsity) ); + _reuseBlk.reset( 1, (int)cols, sparse, (int) (cols*sparsity) ); for( int i=0; i<rows; i++ ) { String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset+1+i)); @@ -545,7 +535,7 @@ public class DataPartitionerLocal extends DataPartitioner } private void appendCellBufferToStagingArea( String dir, LinkedList<Cell> buffer, int brlen, int bclen ) - throws DMLRuntimeException, IOException + throws IOException { HashMap<Long,LinkedList<Cell>> sortedBuffer = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java index b784ae3..c64c5e9 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java @@ -69,7 +69,6 @@ public class DataPartitionerRemoteMR extends DataPartitioner @Override protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) - throws DMLRuntimeException { String jobname = "ParFor-DPMR"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index f7b09b1..b09420b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -58,7 +58,6 @@ public class DataPartitionerRemoteSpark extends DataPartitioner @Override @SuppressWarnings("unchecked") protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) - throws DMLRuntimeException { String jobname = "ParFor-DPSP"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java index ab14ca3..8e62f42 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java @@ -57,9 +57,7 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF private PDataPartitionFormat _dpf; private final long _n; - public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf, int n) - throws DMLRuntimeException - { + public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf, int n) { _rlen = mc.getRows(); _clen = mc.getCols(); _brlen = mc.getRowsPerBlock(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java index cf5cbcf..339fbcc 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysml.parser.ParForStatementBlock.ResultVar; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ProgramBlock; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -111,9 +110,7 @@ public abstract class ParWorker } } - protected void executeTask( Task task ) - throws DMLRuntimeException - { + protected void executeTask( Task task ) { LOG.trace("EXECUTE PARFOR_WORKER ID="+_workerID+" for task "+task.toCompactString()); switch( task.getType() ) @@ -127,9 +124,7 @@ public abstract class ParWorker } } - private void executeSetTask( Task task ) - throws DMLRuntimeException - { + private void executeSetTask( Task task ) { //monitoring start Timing time1 = null, time2 = null; if( _monitor ) @@ -169,9 +164,7 @@ public abstract class ParWorker } } - private void executeRangeTask( Task task ) - throws DMLRuntimeException - { + private void executeRangeTask( Task task ) { //monitoring start Timing time1 = null, time2 = null; if( _monitor ) http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index d5cd518..1356634 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -162,10 +162,9 @@ public class ProgramConverter * @param ec execution context * @return execution context * @throws CloneNotSupportedException if CloneNotSupportedException occurs - * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) - throws CloneNotSupportedException, DMLRuntimeException + throws CloneNotSupportedException { ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram()); cpec.setVariables((LocalVariableMap) ec.getVariables().clone()); @@ -210,10 +209,8 @@ public class ProgramConverter * @param plain if true, full deep copy without id replacement * @param forceDeepCopy if true, force deep copy * @return list of program blocks - * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - throws DMLRuntimeException { ArrayList<ProgramBlock> tmp = new ArrayList<>(); @@ -264,9 +261,7 @@ public class ProgramConverter return tmp; } - public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - throws DMLRuntimeException - { + public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst); tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) ); @@ -278,9 +273,7 @@ public class ProgramConverter return tmpPB; } - public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - throws DMLRuntimeException - { + public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst); tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) ); @@ -293,9 +286,7 @@ public class ProgramConverter return tmpPB; } - public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - throws DMLRuntimeException - { + public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy)); tmpPB.setThreadID(pid); @@ -309,9 +300,7 @@ public class ProgramConverter return tmpPB; } - public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) - throws DMLRuntimeException - { + public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) { ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); tmpPB.setFromInstructions( fpb.getFromInstructions() ); @@ -323,9 +312,7 @@ public class ProgramConverter return tmpPB; } - public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - throws DMLRuntimeException - { + public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { ParForProgramBlock tmpPB = null; if( IDPrefix == -1 ) //still on master node @@ -366,10 +353,8 @@ public class ProgramConverter * @param fnStack ? * @param fnCreated ? * @param plain ? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain) - throws DMLRuntimeException { //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName); @@ -429,7 +414,6 @@ public class ProgramConverter } public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated) - throws DMLRuntimeException { if( fpb == null ) throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock."); @@ -468,10 +452,8 @@ public class ProgramConverter * @param plain ? * @param cpFunctions ? * @return list of instructions - * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) - throws DMLRuntimeException { ArrayList<Instruction> tmp = new ArrayList<>(); for( Instruction inst : instSet ) @@ -491,7 +473,6 @@ public class ProgramConverter } public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) - throws DMLRuntimeException { Instruction inst = null; String tmpString = oInst.toString(); @@ -536,7 +517,6 @@ public class ProgramConverter } public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - throws DMLRuntimeException { StatementBlock ret = null; @@ -576,7 +556,6 @@ public class ProgramConverter } public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - throws DMLRuntimeException { IfStatementBlock ret = null; @@ -617,7 +596,6 @@ public class ProgramConverter } public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - throws DMLRuntimeException { WhileStatementBlock ret = null; @@ -659,7 +637,6 @@ public class ProgramConverter } public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - throws DMLRuntimeException { ForStatementBlock ret = null; @@ -719,12 +696,11 @@ public class ProgramConverter // SERIALIZATION //////////////////////////////// - public static String serializeParForBody( ParForBody body ) throws DMLRuntimeException { + public static String serializeParForBody( ParForBody body ) { return serializeParForBody(body, new HashMap<String, byte[]>()); } public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) - throws DMLRuntimeException { ArrayList<ProgramBlock> pbs = body.getChildBlocks(); ArrayList<ResultVar> rVnames = body.getResultVariables(); @@ -789,21 +765,16 @@ public class ProgramConverter return sb.toString(); } - private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) - throws DMLRuntimeException - { + private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) { //note program contains variables, programblocks and function program blocks //but in order to avoid redundancy, we only serialize function program blocks - HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); HashSet<String> cand = new HashSet<>(); rFindSerializationCandidates(pbs, cand); - return rSerializeFunctionProgramBlocks( fpb, cand, clsMap ); } private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) - throws DMLRuntimeException { for( ProgramBlock pb : pbs ) { @@ -844,32 +815,25 @@ public class ProgramConverter } } - private static String serializeVariables (LocalVariableMap vars) - throws DMLRuntimeException - { + private static String serializeVariables (LocalVariableMap vars) { StringBuilder sb = new StringBuilder(); sb.append( PARFOR_VARS_BEGIN ); sb.append( vars.serialize() ); sb.append( PARFOR_VARS_END ); - return sb.toString(); } public static String serializeDataObject(String key, Data dat) - throws DMLRuntimeException { // SCHEMA: <name>|<datatype>|<valuetype>|value // (scalars are serialize by value, matrices by filename) - StringBuilder sb = new StringBuilder(); - //prepare data for serialization String name = key; DataType datatype = dat.getDataType(); ValueType valuetype = dat.getValueType(); String value = null; String[] matrixMetaData = null; - switch( datatype ) { case SCALAR: @@ -917,14 +881,13 @@ public class ProgramConverter return sb.toString(); } - private static String serializeExecutionContext( ExecutionContext ec ) throws DMLRuntimeException { + private static String serializeExecutionContext( ExecutionContext ec ) { return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY; } @SuppressWarnings("all") private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) - throws DMLRuntimeException - { + { StringBuilder sb = new StringBuilder(); int count = 0; for( Instruction linst : inst ) @@ -1078,7 +1041,6 @@ public class ProgramConverter } private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) - throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -1101,9 +1063,7 @@ public class ProgramConverter return sb.toString(); } - private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) - throws DMLRuntimeException - { + private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) { StringBuilder sb = new StringBuilder(); int count = 0; for( ProgramBlock pb : pbs ) @@ -1120,9 +1080,7 @@ public class ProgramConverter return sb.toString(); } - private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) - throws DMLRuntimeException - { + private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) { StringBuilder sb = new StringBuilder(); //handle header @@ -1297,9 +1255,7 @@ public class ProgramConverter // PARSING //////////////////////////////// - public static ParForBody parseParForBody( String in, int id ) - throws DMLRuntimeException - { + public static ParForBody parseParForBody( String in, int id ) { ParForBody body = new ParForBody(); //header elimination @@ -1353,9 +1309,7 @@ public class ProgramConverter return body; } - public static Program parseProgram( String in, int id ) - throws DMLRuntimeException - { + public static Program parseProgram( String in, int id ) { String lin = in.substring( PARFOR_PROG_BEGIN.length(),in.length()-PARFOR_PROG_END.length()).trim(); Program prog = new Program(); @@ -1373,9 +1327,7 @@ public class ProgramConverter return prog; } - private static LocalVariableMap parseVariables(String in) - throws DMLRuntimeException - { + private static LocalVariableMap parseVariables(String in) { LocalVariableMap ret = null; if( in.length()> PARFOR_VARS_BEGIN.length() + PARFOR_VARS_END.length()) @@ -1391,9 +1343,7 @@ public class ProgramConverter return ret; } - private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) { HashMap<String,FunctionProgramBlock> ret = new HashMap<>(); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM ); @@ -1411,9 +1361,7 @@ public class ProgramConverter return ret; } - private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) - throws DMLRuntimeException - { + private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) { ArrayList<ProgramBlock> pbs = new ArrayList<>(); String tmpdata = in.substring(PARFOR_PBS_BEGIN.length(),in.length()-PARFOR_PBS_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM); @@ -1427,9 +1375,7 @@ public class ProgramConverter return pbs; } - private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) { ProgramBlock pb = null; if( in.startsWith( PARFOR_PB_WHILE ) ) @@ -1452,9 +1398,7 @@ public class ProgramConverter return pb; } - private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1474,9 +1418,7 @@ public class ProgramConverter return wpb; } - private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1504,9 +1446,7 @@ public class ProgramConverter return fpb; } - private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1537,9 +1477,7 @@ public class ProgramConverter return pfpb; } - private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1561,9 +1499,7 @@ public class ProgramConverter return ipb; } - private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1586,9 +1522,7 @@ public class ProgramConverter return fpb; } - private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); @@ -1618,9 +1552,7 @@ public class ProgramConverter return efpb; } - private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) - throws DMLRuntimeException - { + private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) { String lin = in.substring( PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length()); StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM); @@ -1632,9 +1564,7 @@ public class ProgramConverter return pb; } - private static ArrayList<Instruction> parseInstructions( String in, int id ) - throws DMLRuntimeException - { + private static ArrayList<Instruction> parseInstructions( String in, int id ) { ArrayList<Instruction> insts = new ArrayList<>(); String lin = in.substring( PARFOR_INST_BEGIN.length(),in.length()-PARFOR_INST_END.length()); StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM); @@ -1720,11 +1650,8 @@ public class ProgramConverter * * @param in data object as string * @return array of objects - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static Object[] parseDataObject(String in) - throws DMLRuntimeException - { + public static Object[] parseDataObject(String in) { Object[] ret = new Object[2]; StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM ); @@ -1787,7 +1714,6 @@ public class ProgramConverter } private static ExecutionContext parseExecutionContext(String in, Program prog) - throws DMLRuntimeException { ExecutionContext ec = null; @@ -1818,10 +1744,8 @@ public class ProgramConverter * @param pattern ? * @param replacement string replacement * @return instruction - * @throws DMLRuntimeException if DMLRuntimeException occurs */ private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) - throws DMLRuntimeException { //currently known, relevant instructions: createvar, rand, seq, extfunct, if( inst instanceof MRJobInstruction ) http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index 800cdb4..b51df84 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -72,7 +72,6 @@ public class RemoteDPParForMR public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, PartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params boolean enableCPCaching, int numReducers, int replication) //opt params - throws DMLRuntimeException { RemoteParForJobReturn ret = null; String jobname = "ParFor-DPEMR"; @@ -237,12 +236,11 @@ public class RemoteDPParForMR * @param job job configuration * @param fname file name * @return array of local variable maps - * @throws DMLRuntimeException if DMLRuntimeException occurs * @throws IOException if IOException occurs */ @SuppressWarnings("deprecation") public static LocalVariableMap [] readResultFile( JobConf job, String fname ) - throws DMLRuntimeException, IOException + throws IOException { HashMap<Long,LocalVariableMap> tmp = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 4562b06..b12dc88 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -39,7 +39,6 @@ import org.apache.spark.util.LongAccumulator; import scala.Tuple2; import org.apache.sysml.api.DMLScript; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; @@ -73,7 +72,6 @@ public class RemoteDPParForSpark public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, HashMap<String, byte[]> clsMap, String resultFile, MatrixObject input, ExecutionContext ec, PartitionFormat dpf, OutputInfo oi, boolean tSparseCol, boolean enableCPCaching, int numReducers ) - throws DMLRuntimeException { String jobname = "ParFor-DPESP"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -124,7 +122,6 @@ public class RemoteDPParForSpark @SuppressWarnings("unchecked") private static JavaPairRDD<Long, Writable> getPartitionedInput(SparkExecutionContext sec, String matrixvar, OutputInfo oi, PartitionFormat dpf) - throws DMLRuntimeException { InputInfo ii = InputInfo.BinaryBlockInputInfo; MatrixObject mo = sec.getMatrixObject(matrixvar); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 52ac922..4ab90ae 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -73,7 +73,6 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF public RemoteDPParForSparkWorker(String program, HashMap<String, byte[]> clsMap, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) - throws DMLRuntimeException { _prog = program; _clsMap = clsMap; @@ -142,7 +141,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF } private void configureWorker( long ID ) - throws DMLRuntimeException, IOException + throws IOException { _workerID = ID; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index b998cba..baec5a2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -69,7 +69,6 @@ public class RemoteParForMR public static RemoteParForJobReturn runJob(long pfid, String program, String taskFile, String resultFile, MatrixObject colocatedDPMatrixObj, //inputs boolean enableCPCaching, int numMappers, int replication, int max_retry, long minMem, boolean jvmReuse) //opt params - throws DMLRuntimeException { RemoteParForJobReturn ret = null; String jobname = "ParFor-EMR"; @@ -248,12 +247,11 @@ public class RemoteParForMR * @param job job configuration * @param fname file name * @return array of local variable maps - * @throws DMLRuntimeException if DMLRuntimeException occurs * @throws IOException if IOException occurs */ @SuppressWarnings("deprecation") public static LocalVariableMap [] readResultFile( JobConf job, String fname ) - throws DMLRuntimeException, IOException + throws IOException { HashMap<Long,LocalVariableMap> tmp = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java index 10b44a2..eee65f3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java @@ -30,7 +30,6 @@ import org.apache.spark.util.LongAccumulator; import scala.Tuple2; import org.apache.sysml.api.DMLScript; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -60,7 +59,6 @@ public class RemoteParForSpark public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, List<Task> tasks, ExecutionContext ec, boolean cpCaching, int numMappers) - throws DMLRuntimeException { String jobname = "ParFor-ESP"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index 41dc53c..45e3bc7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -30,7 +30,6 @@ import java.util.stream.Collectors; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.util.LongAccumulator; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -55,15 +54,12 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun private final LongAccumulator _aTasks; private final LongAccumulator _aIters; - public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) - throws DMLRuntimeException - { + public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) { _jobid = jobid; _prog = program; _clsMap = clsMap; _initialized = false; _caching = cpCaching; - //setup spark accumulators _aTasks = atasks; _aIters = aiters; @@ -96,7 +92,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun } private void configureWorker(long taskID) - throws DMLRuntimeException, IOException + throws IOException { _workerID = taskID; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java index 7c24ae5..e93f0ac 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java @@ -38,7 +38,6 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.ParForStatementBlock.ResultVar; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; @@ -90,9 +89,7 @@ public class RemoteParForUtils } } - public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, OutputCollector<Writable, Writable> out ) - throws DMLRuntimeException, IOException - { + public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, OutputCollector<Writable, Writable> out ) throws IOException { exportResultVariables(workerID, vars, resultVars, null, out); } @@ -103,13 +100,11 @@ public class RemoteParForUtils * @param vars local variable map * @param resultVars list of result variables * @param rvarFnames ? - * @param out output collector - * @throws DMLRuntimeException if DMLRuntimeException occurs + * @param out output collectors * @throws IOException if IOException occurs */ public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, - HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) - throws DMLRuntimeException, IOException + HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) throws IOException { //create key and value for reuse LongWritable okey = new LongWritable( workerID ); @@ -159,11 +154,10 @@ public class RemoteParForUtils * @param vars local variable map * @param resultVars list of result variables * @return list of result variables - * @throws DMLRuntimeException if DMLRuntimeException occurs * @throws IOException if IOException occurs */ public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars) - throws DMLRuntimeException, IOException + throws IOException { ArrayList<String> ret = new ArrayList<>(); @@ -218,7 +212,6 @@ public class RemoteParForUtils } public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) - throws DMLRuntimeException { HashMap<Long,LocalVariableMap> tmp = new HashMap<>(); @@ -228,10 +221,10 @@ public class RemoteParForUtils Long key = entry._1(); String val = entry._2(); if( !tmp.containsKey( key ) ) - tmp.put(key, new LocalVariableMap ()); + tmp.put(key, new LocalVariableMap ()); Object[] dat = ProgramConverter.parseDataObject( val ); - tmp.get(key).put((String)dat[0], (Data)dat[1]); - countAll++; + tmp.get(key).put((String)dat[0], (Data)dat[1]); + countAll++; } if( LOG != null ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java index 7d43475..be87b17 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.matrix.data.DenseBlock; @@ -71,10 +70,8 @@ public abstract class ResultMerge implements Serializable * of one input matrix at a time. * * @return output (merged) matrix - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract MatrixObject executeSerialMerge() - throws DMLRuntimeException; + public abstract MatrixObject executeSerialMerge(); /** * Merge all given input matrices in parallel into the given output matrix. @@ -83,10 +80,8 @@ public abstract class ResultMerge implements Serializable * * @param par degree of parallelism * @return output (merged) matrix - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract MatrixObject executeParallelMerge( int par ) - throws DMLRuntimeException; + public abstract MatrixObject executeParallelMerge( int par ); /** * ? @@ -94,9 +89,8 @@ public abstract class ResultMerge implements Serializable * @param out initially empty block * @param in input matrix block * @param appendOnly ? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) throws DMLRuntimeException { + protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { //pass through to matrix block operations if( _isAccum ) out.binaryOperationsInPlace(PLUS, in); @@ -111,10 +105,8 @@ public abstract class ResultMerge implements Serializable * @param out output matrix block * @param in input matrix block * @param compare ? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare ) - throws DMLRuntimeException { //Notes for result correctness: // * Always iterate over entire block in order to compare all values http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java index 135cf9c..aa42753 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java @@ -21,7 +21,6 @@ package org.apache.sysml.runtime.controlprogram.parfor; import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptimizerRuleBased; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; @@ -38,9 +37,7 @@ public class ResultMergeLocalAutomatic extends ResultMerge } @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() { Timing time = new Timing(true); MatrixCharacteristics mc = _output.getMatrixCharacteristics(); @@ -60,9 +57,7 @@ public class ResultMergeLocalAutomatic extends ResultMerge } @Override - public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException - { + public MatrixObject executeParallelMerge(int par) { MatrixCharacteristics mc = _output.getMatrixCharacteristics(); long rows = mc.getRows(); long cols = mc.getCols(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index b2bd267..4f7fac5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -87,9 +87,7 @@ public class ResultMergeLocalFile extends ResultMerge @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) if( LOG.isTraceEnabled() ) @@ -140,16 +138,12 @@ public class ResultMergeLocalFile extends ResultMerge } @Override - public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException - { + public MatrixObject executeParallelMerge(int par) { //graceful degradation to serial merge return executeSerialMerge(); } - private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO) - throws DMLRuntimeException - { + private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO) { MetaDataFormat metadata = (MetaDataFormat) _output.getMetaData(); MatrixObject moNew = new MatrixObject( _output.getValueType(), _outputFName ); @@ -166,7 +160,6 @@ public class ResultMergeLocalFile extends ResultMerge } private void merge( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { OutputInfo oi = ((MetaDataFormat)outMo.getMetaData()).getOutputInfo(); boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist or unknown (-1) @@ -195,7 +188,6 @@ public class ResultMergeLocalFile extends ResultMerge } private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { try { @@ -262,7 +254,6 @@ public class ResultMergeLocalFile extends ResultMerge } private void mergeTextCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); @@ -302,7 +293,6 @@ public class ResultMergeLocalFile extends ResultMerge @SuppressWarnings("deprecation") private static void mergeBinaryCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { try { @@ -362,7 +352,6 @@ public class ResultMergeLocalFile extends ResultMerge } private void mergeBinaryCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); @@ -401,7 +390,6 @@ public class ResultMergeLocalFile extends ResultMerge } private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); @@ -431,7 +419,6 @@ public class ResultMergeLocalFile extends ResultMerge } private void mergeBinaryBlockWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException { String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); @@ -609,7 +596,7 @@ public class ResultMergeLocalFile extends ResultMerge } private static void appendCellBufferToStagingArea( String fnameStaging, long ID, LinkedList<Cell> buffer, int brlen, int bclen ) - throws DMLRuntimeException, IOException + throws IOException { HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new HashMap<>(); long brow, bcol, row_offset, col_offset; @@ -975,7 +962,7 @@ public class ResultMergeLocalFile extends ResultMerge } private static void copyAllFiles( String fnameNew, ArrayList<MatrixObject> inMO ) - throws DMLRuntimeException, IOException + throws IOException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fnameNew ); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index 1a74635..e3a2e82 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -53,7 +53,6 @@ public class ResultMergeLocalMemory extends ResultMerge @Override public MatrixObject executeSerialMerge() - throws DMLRuntimeException { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) @@ -142,8 +141,7 @@ public class ResultMergeLocalMemory extends ResultMerge @Override public MatrixObject executeParallelMerge( int par ) - throws DMLRuntimeException - { + { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) if( LOG.isTraceEnabled() ) @@ -225,9 +223,7 @@ public class ResultMergeLocalMemory extends ResultMerge return null; } - private MatrixObject createNewMatrixObject( MatrixBlock data ) - throws DMLRuntimeException - { + private MatrixObject createNewMatrixObject( MatrixBlock data ) { ValueType vt = _output.getValueType(); MetaDataFormat metadata = (MetaDataFormat) _output.getMetaData(); MatrixObject moNew = new MatrixObject( vt, _outputFName ); @@ -246,8 +242,8 @@ public class ResultMergeLocalMemory extends ResultMerge data.examSparsity(); //release new output - moNew.acquireModify(data); - moNew.release(); + moNew.acquireModify(data); + moNew.release(); return moNew; } @@ -263,11 +259,8 @@ public class ResultMergeLocalMemory extends ResultMerge * @param out output matrix block * @param in input matrix block * @param appendOnly ? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly ) - throws DMLRuntimeException - { + private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { if( _compare == null ) mergeWithoutComp(out, in, appendOnly); else http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java index c20f0db..0a785fd 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java @@ -85,16 +85,13 @@ public class ResultMergeRemoteMR extends ResultMerge } @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() { //graceful degradation to parallel merge return executeParallelMerge( _numMappers ); } @Override public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) if( LOG.isTraceEnabled() ) @@ -160,7 +157,6 @@ public class ResultMergeRemoteMR extends ResultMerge @SuppressWarnings({ "unused", "deprecation" }) protected void executeMerge(String fname, String fnameNew, String[] srcFnames, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) - throws DMLRuntimeException { String jobname = "ParFor-RMMR"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java index 6fa6534..41c6b5c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java @@ -277,12 +277,12 @@ public class ResultMergeRemoteReducer } @Override - public MatrixObject executeParallelMerge(int par) throws DMLRuntimeException { + public MatrixObject executeParallelMerge(int par) { throw new DMLRuntimeException("Unsupported operation."); } @Override - public MatrixObject executeSerialMerge() throws DMLRuntimeException { + public MatrixObject executeSerialMerge() { throw new DMLRuntimeException("Unsupported operation."); } http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index a68c23e..e801dc8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -62,16 +62,13 @@ public class ResultMergeRemoteSpark extends ResultMerge } @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() { //graceful degradation to parallel merge return executeParallelMerge( _numMappers ); } @Override public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) @@ -113,7 +110,6 @@ public class ResultMergeRemoteSpark extends ResultMerge @SuppressWarnings("unchecked") protected RDDObject executeMerge(MatrixObject compare, MatrixObject[] inputs, long rlen, long clen, int brlen, int bclen) - throws DMLRuntimeException { String jobname = "ParFor-RMSP"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java index 9a755b2..82a64c0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java @@ -61,12 +61,12 @@ public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairF } @Override - public MatrixObject executeSerialMerge() throws DMLRuntimeException { + public MatrixObject executeSerialMerge() { throw new DMLRuntimeException("Unsupported operation."); } @Override - public MatrixObject executeParallelMerge(int par) throws DMLRuntimeException { + public MatrixObject executeParallelMerge(int par) { throw new DMLRuntimeException("Unsupported operation."); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitioner.java index fe8311f..70f7bc9 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitioner.java @@ -21,7 +21,6 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.util.List; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.cp.IntObject; /** @@ -60,10 +59,8 @@ public abstract class TaskPartitioner * Creates and returns set of all tasks for given problem at once. * * @return list of tasks - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract List<Task> createTasks() - throws DMLRuntimeException; + public abstract List<Task> createTasks(); /** * Creates set of all tasks for given problem, but streams them directly @@ -72,10 +69,8 @@ public abstract class TaskPartitioner * * @param queue queue of takss * @return ? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract long createTasks( LocalTaskQueue<Task> queue ) - throws DMLRuntimeException; + public abstract long createTasks( LocalTaskQueue<Task> queue ); public long getNumIterations() { return _numIter; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java index 547e607..6b3f677 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java @@ -52,8 +52,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner } @Override - public List<Task> createTasks() - throws DMLRuntimeException + public List<Task> createTasks() { LinkedList<Task> tasks = new LinkedList<>(); long lFrom = _fromVal.getLongValue(); @@ -107,8 +106,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner @Override public long createTasks(LocalTaskQueue<Task> queue) - throws DMLRuntimeException - { + { long numCreatedTasks = 0; long lFrom = _fromVal.getLongValue(); http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java index f283b69..bcc8348 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java @@ -44,7 +44,6 @@ public class TaskPartitionerFixedsize extends TaskPartitioner @Override public List<Task> createTasks() - throws DMLRuntimeException { LinkedList<Task> tasks = new LinkedList<>(); @@ -92,7 +91,6 @@ public class TaskPartitionerFixedsize extends TaskPartitioner @Override public long createTasks(LocalTaskQueue<Task> queue) - throws DMLRuntimeException { long numCreatedTasks=0; http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/mqo/RuntimePiggybacking.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/mqo/RuntimePiggybacking.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/mqo/RuntimePiggybacking.java index bab457b..5f3b04d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/mqo/RuntimePiggybacking.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/mqo/RuntimePiggybacking.java @@ -67,20 +67,15 @@ public class RuntimePiggybacking // public interface to runtime piggybacking /////// - public static boolean isActive() - { + public static boolean isActive() { return _active; } - public static void start( int par ) - throws DMLRuntimeException - { + public static void start( int par ) { start( DEFAULT_WORKER_TYPE, par ); } - public static void start( PiggybackingType type, int par ) - throws DMLRuntimeException - { + public static void start( PiggybackingType type, int par ) { //activate piggybacking server _active = true; @@ -104,9 +99,7 @@ public class RuntimePiggybacking _worker.start(); } - public static void stop() - throws DMLRuntimeException - { + public static void stop() { try { //deactivate piggybacking server @@ -123,9 +116,7 @@ public class RuntimePiggybacking } } - public static JobReturn submitJob(MRJobInstruction inst) - throws DMLRuntimeException - { + public static JobReturn submitJob(MRJobInstruction inst) { JobReturn ret = null; try http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java index 4af7920..2f5392e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.ParamType; /** @@ -63,10 +62,8 @@ public abstract class CostEstimator * @param measure ? * @param node internal representation of a plan alternative for program blocks and instructions * @return estimate? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract double getLeafNodeEstimate( TestMeasure measure, OptNode node ) - throws DMLRuntimeException; + public abstract double getLeafNodeEstimate( TestMeasure measure, OptNode node ); /** * Main leaf node estimation method - to be overwritten by specific cost estimators @@ -75,10 +72,8 @@ public abstract class CostEstimator * @param node internal representation of a plan alternative for program blocks and instructions * @param et forced execution type for leaf node * @return estimate? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public abstract double getLeafNodeEstimate( TestMeasure measure, OptNode node, ExecType et ) - throws DMLRuntimeException; + public abstract double getLeafNodeEstimate( TestMeasure measure, OptNode node, ExecType et ); ///////// @@ -92,22 +87,16 @@ public abstract class CostEstimator * @param node internal representation of a plan alternative for program blocks and instructions * @param inclCondPart including conditional partitioning * @return estimate? - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public double getEstimate( TestMeasure measure, OptNode node ) - throws DMLRuntimeException - { + public double getEstimate( TestMeasure measure, OptNode node ) { return getEstimate(measure, node, null); } - public double getEstimate( TestMeasure measure, OptNode node, boolean inclCondPart ) - throws DMLRuntimeException - { + public double getEstimate( TestMeasure measure, OptNode node, boolean inclCondPart ) { //temporarily change local flag and get estimate boolean oldInclCondPart = _inclCondPart; _inclCondPart = inclCondPart; double val = getEstimate(measure, node, null); - //reset local flag and return _inclCondPart = oldInclCondPart; return val; @@ -120,11 +109,8 @@ public abstract class CostEstimator * @param node plan opt tree node * @param et execution type * @return estimate - * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public double getEstimate( TestMeasure measure, OptNode node, ExecType et ) - throws DMLRuntimeException - { + public double getEstimate( TestMeasure measure, OptNode node, ExecType et ) { double val = -1; if( node.isLeaf() ) @@ -212,27 +198,21 @@ public abstract class CostEstimator return -1; } - protected double getMaxEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) - throws DMLRuntimeException - { + protected double getMaxEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) { double max = Double.MIN_VALUE; //smallest positive value for( OptNode n : nodes ) max = Math.max(max, getEstimate(measure, n, et)); return max; } - protected double getSumEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) - throws DMLRuntimeException - { + protected double getSumEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) { double sum = 0; for( OptNode n : nodes ) sum += getEstimate( measure, n, et ); - return sum; + return sum; } - protected double getWeightedEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) - throws DMLRuntimeException - { + protected double getWeightedEstimate( TestMeasure measure, ArrayList<OptNode> nodes, ExecType et ) { double ret = 0; int len = nodes.size(); for( OptNode n : nodes ) http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java index 9646b09..55d9c0c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java @@ -48,7 +48,6 @@ public class CostEstimatorHops extends CostEstimator @Override public double getLeafNodeEstimate(TestMeasure measure, OptNode node) - throws DMLRuntimeException { if( node.getNodeType() != NodeType.HOP ) return 0; //generic optnode but no childs (e.g., PB for rmvar inst) @@ -111,7 +110,6 @@ public class CostEstimatorHops extends CostEstimator @Override public double getLeafNodeEstimate(TestMeasure measure, OptNode node, ExecType et) - throws DMLRuntimeException { if( node.getNodeType() != NodeType.HOP ) return 0; //generic optnode but no childs (e.g., PB for rmvar inst) http://git-wip-us.apache.org/repos/asf/systemml/blob/583f3448/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorRuntime.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorRuntime.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorRuntime.java index 442ad48..9a842a7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorRuntime.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorRuntime.java @@ -23,7 +23,6 @@ package org.apache.sysml.runtime.controlprogram.parfor.opt; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.hops.cost.CostEstimationWrapper; import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ProgramBlock; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; @@ -52,9 +51,7 @@ public class CostEstimatorRuntime extends CostEstimator } @Override - public double getLeafNodeEstimate( TestMeasure measure, OptNode node ) - throws DMLRuntimeException - { + public double getLeafNodeEstimate( TestMeasure measure, OptNode node ) { //use CostEstimatorHops to get the memory estimate if( measure == TestMeasure.MEMORY_USAGE ) return _costMem.getLeafNodeEstimate(measure, node); @@ -65,9 +62,7 @@ public class CostEstimatorRuntime extends CostEstimator } @Override - public double getLeafNodeEstimate( TestMeasure measure, OptNode node, ExecType et ) - throws DMLRuntimeException - { + public double getLeafNodeEstimate( TestMeasure measure, OptNode node, ExecType et ) { //use CostEstimatorHops to get the memory estimate if( measure == TestMeasure.MEMORY_USAGE ) return _costMem.getLeafNodeEstimate(measure, node, et);
