http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 1dd3600..06a2005 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -84,8 +84,8 @@ public class SparkExecutionContext extends ExecutionContext { private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName()); private static final boolean LDEBUG = false; //local debug flag - - //internal configurations + + //internal configurations private static boolean LAZY_SPARKCTX_CREATION = true; private static boolean ASYNCHRONOUS_VAR_DESTROY = true; @@ -93,16 +93,16 @@ public class SparkExecutionContext extends ExecutionContext //executor memory and relative fractions as obtained from the spark configuration private static SparkClusterConfig _sconf = null; - - //singleton spark context (as there can be only one spark context per JVM) - private static JavaSparkContext _spctx = null; - - //registry of parallelized RDDs to enforce that at any time, we spent at most + + //singleton spark context (as there can be only one spark context per JVM) + private static JavaSparkContext _spctx = null; + + //registry of parallelized RDDs to enforce that at any time, we spent at most //10% of JVM max heap size for parallelized RDDs; if this is not sufficient, //matrices or frames are exported to HDFS and the RDDs are created from files. //TODO unify memory management for CP, par RDDs, and potentially broadcasts private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1); - + static { // for internal debugging only if( LDEBUG ) { @@ -111,31 +111,31 @@ public class SparkExecutionContext extends ExecutionContext } } - protected SparkExecutionContext(boolean allocateVars, Program prog) + protected SparkExecutionContext(boolean allocateVars, Program prog) { //protected constructor to force use of ExecutionContextFactory super( allocateVars, prog ); - + //spark context creation via internal initializer if( !(LAZY_SPARKCTX_CREATION && OptimizerUtils.isHybridExecutionMode()) ) { initSparkContext(); } } - + /** * Returns the used singleton spark context. In case of lazy spark context * creation, this methods blocks until the spark context is created. - * + * * @return java spark context */ public JavaSparkContext getSparkContext() { - //lazy spark context creation on demand (lazy instead of asynchronous + //lazy spark context creation on demand (lazy instead of asynchronous //to avoid wait for uninitialized spark context on close) if( LAZY_SPARKCTX_CREATION ) { initSparkContext(); } - + //return the created spark context return _spctx; } @@ -144,11 +144,11 @@ public class SparkExecutionContext extends ExecutionContext initSparkContext(); return _spctx; } - + /** * Indicates if the spark context has been created or has * been passed in from outside. - * + * * @return true if spark context created */ public synchronized static boolean isSparkContextCreated() { @@ -159,26 +159,25 @@ public class SparkExecutionContext extends ExecutionContext _spctx = null; } - public void close() + public void close() { synchronized( SparkExecutionContext.class ) { - if( _spctx != null ) + if( _spctx != null ) { //stop the spark context if existing _spctx.stop(); - + //make sure stopped context is never used again - _spctx = null; + _spctx = null; } - + } } - + public static boolean isLazySparkContextCreation(){ return LAZY_SPARKCTX_CREATION; } - @SuppressWarnings("deprecation") private synchronized static void initSparkContext() { //check for redundant spark context init @@ -186,24 +185,19 @@ public class SparkExecutionContext extends ExecutionContext return; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; - + //create a default spark context (master, appname, etc refer to system properties //as given in the spark configuration or during spark-submit) - + Object mlCtxObj = MLContextProxy.getActiveMLContext(); - if(mlCtxObj != null) + if(mlCtxObj != null) { // This is when DML is called through spark shell // Will clean the passing of static variables later as this involves minimal change to DMLScript - if (mlCtxObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlCtxObj; - _spctx = new JavaSparkContext(mlCtx.getSparkContext()); - } else if (mlCtxObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj; - _spctx = MLContextUtil.getJavaSparkContext(mlCtx); - } + org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj; + _spctx = MLContextUtil.getJavaSparkContext(mlCtx); } - else + else { if(DMLScript.USE_LOCAL_SPARK_CONFIG) { // For now set 4 cores for integration testing :) @@ -220,128 +214,128 @@ public class SparkExecutionContext extends ExecutionContext SparkConf conf = createSystemMLSparkConf(); _spctx = new JavaSparkContext(conf); } - + _parRDDs.clear(); } - - // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect + + // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g"); - long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize); + long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize); if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG) LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "." - + " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " + + " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " + UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + "."); - + //globally add binaryblock serialization framework for all hdfs read/write operations - //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end + //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() ); - + //statistics maintenance if( DMLScript.STATISTICS ){ Statistics.setSparkCtxCreateTime(System.nanoTime()-t0); } - } - + } + /** * Sets up a SystemML-preferred Spark configuration based on the implicit * default configuration (as passed via configurations from outside). - * + * * @return spark configuration */ public static SparkConf createSystemMLSparkConf() { SparkConf conf = new SparkConf(); - + //always set unlimited result size (required for cp collect) conf.set("spark.driver.maxResultSize", "0"); - + //always use the fair scheduler (for single jobs, it's equivalent to fifo //but for concurrent jobs in parfor it ensures better data locality because //round robin assignment mitigates the problem of 'sticky slots') if( FAIR_SCHEDULER_MODE ) { conf.set("spark.scheduler.mode", "FAIR"); } - + //increase scheduler delay (usually more robust due to better data locality) if( !conf.contains("spark.locality.wait") ) { //default 3s conf.set("spark.locality.wait", "5s"); } - + return conf; } /** * Spark instructions should call this for all matrix inputs except broadcast * variables. - * + * * @param varname variable name * @return JavaPairRDD of MatrixIndexes-MatrixBlocks * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) - throws DMLRuntimeException + public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) + throws DMLRuntimeException { return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); } - + /** * Spark instructions should call this for all frame inputs except broadcast * variables. - * + * * @param varname variable name * @return JavaPairRDD of Longs-FrameBlocks * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) - throws DMLRuntimeException + public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) + throws DMLRuntimeException { JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); return out; } - public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) + public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) throws DMLRuntimeException { Data dat = getVariable(varname); if( dat instanceof MatrixObject ) { MatrixObject mo = getMatrixObject(varname); - return getRDDHandleForMatrixObject(mo, inputInfo); + return getRDDHandleForMatrixObject(mo, inputInfo); } else if( dat instanceof FrameObject ) { FrameObject fo = getFrameObject(varname); - return getRDDHandleForFrameObject(fo, inputInfo); + return getRDDHandleForFrameObject(fo, inputInfo); } else { throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame."); } } - + /** - * This call returns an RDD handle for a given matrix object. This includes - * the creation of RDDs for in-memory or binary-block HDFS data. - * + * This call returns an RDD handle for a given matrix object. This includes + * the creation of RDDs for in-memory or binary-block HDFS data. + * * @param mo matrix object * @param inputInfo input info * @return JavaPairRDD handle for a matrix object * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) + public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) throws DMLRuntimeException - { + { //NOTE: MB this logic should be integrated into MatrixObject - //However, for now we cannot assume that spark libraries are - //always available and hence only store generic references in + //However, for now we cannot assume that spark libraries are + //always available and hence only store generic references in //matrix object while all the logic is in the SparkExecContext - + JavaSparkContext sc = getSparkContext(); JavaPairRDD<?,?> rdd = null; //CASE 1: rdd already existing (reuse if checkpoint or trigger - //pending rdd operations if not yet cached but prevent to re-evaluate + //pending rdd operations if not yet cached but prevent to re-evaluate //rdd operations if already executed and cached - if( mo.getRDDHandle()!=null + if( mo.getRDDHandle()!=null && (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) ) { //return existing rdd handling (w/o input format change) @@ -359,7 +353,7 @@ public class SparkExecutionContext extends ExecutionContext if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary mo.exportData(); rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); - rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug + rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug fromFile = true; } else { //default case @@ -368,7 +362,7 @@ public class SparkExecutionContext extends ExecutionContext mo.release(); //unpin matrix _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } - + //keep rdd handle for future operations on it RDDObject rddhandle = new RDDObject(rdd, mo.getVarName()); rddhandle.setHDFSFile(fromFile); @@ -396,43 +390,43 @@ public class SparkExecutionContext extends ExecutionContext else { throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable"); } - + //keep rdd handle for future operations on it RDDObject rddhandle = new RDDObject(rdd, mo.getVarName()); rddhandle.setHDFSFile(true); mo.setRDDHandle(rddhandle); } - + return rdd; } - + /** * FIXME: currently this implementation assumes matrix representations but frame signature * in order to support the old transform implementation. - * + * * @param fo frame object * @param inputInfo input info * @return JavaPairRDD handle for a frame object * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo ) + public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo ) throws DMLRuntimeException - { + { //NOTE: MB this logic should be integrated into FrameObject - //However, for now we cannot assume that spark libraries are - //always available and hence only store generic references in + //However, for now we cannot assume that spark libraries are + //always available and hence only store generic references in //matrix object while all the logic is in the SparkExecContext - - InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ? + + InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ? InputInfo.BinaryBlockFrameInputInfo : inputInfo; - + JavaSparkContext sc = getSparkContext(); JavaPairRDD<?,?> rdd = null; //CASE 1: rdd already existing (reuse if checkpoint or trigger - //pending rdd operations if not yet cached but prevent to re-evaluate + //pending rdd operations if not yet cached but prevent to re-evaluate //rdd operations if already executed and cached - if( fo.getRDDHandle()!=null + if( fo.getRDDHandle()!=null && (fo.getRDDHandle().isCheckpointRDD() || !fo.isCached(false)) ) { //return existing rdd handling (w/o input format change) @@ -451,7 +445,7 @@ public class SparkExecutionContext extends ExecutionContext fo.exportData(); } rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); - rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug + rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug fromFile = true; } else { //default case @@ -460,7 +454,7 @@ public class SparkExecutionContext extends ExecutionContext fo.release(); //unpin frame _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } - + //keep rdd handle for future operations on it RDDObject rddhandle = new RDDObject(rdd, fo.getVarName()); rddhandle.setHDFSFile(fromFile); @@ -487,64 +481,64 @@ public class SparkExecutionContext extends ExecutionContext else { throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable"); } - + //keep rdd handle for future operations on it RDDObject rddhandle = new RDDObject(rdd, fo.getVarName()); rddhandle.setHDFSFile(true); fo.setRDDHandle(rddhandle); } - + return rdd; } - + /** * TODO So far we only create broadcast variables but never destroy * them. This is a memory leak which might lead to executor out-of-memory. - * However, in order to handle this, we need to keep track when broadcast + * However, in order to handle this, we need to keep track when broadcast * variables are no longer required. - * + * * @param varname variable name * @return wrapper for broadcast variables * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) + public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) throws DMLRuntimeException - { + { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixObject mo = getMatrixObject(varname); - + PartitionedBroadcast<MatrixBlock> bret = null; - + //reuse existing broadcast handle - if( mo.getBroadcastHandle()!=null - && mo.getBroadcastHandle().isValid() ) + if( mo.getBroadcastHandle()!=null + && mo.getBroadcastHandle().isValid() ) { bret = mo.getBroadcastHandle().getBroadcast(); } - + //create new broadcast handle (never created, evicted) - if( bret == null ) + if( bret == null ) { //account for overwritten invalid broadcast (e.g., evicted) if( mo.getBroadcastHandle()!=null ) CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getSize()); - - //obtain meta data for matrix + + //obtain meta data for matrix int brlen = (int) mo.getNumRowsPerBlock(); int bclen = (int) mo.getNumColumnsPerBlock(); - + //create partitioned matrix block and release memory consumed by input MatrixBlock mb = mo.acquireRead(); PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<MatrixBlock>(mb, brlen, bclen); mo.release(); - + //determine coarse-grained partitioning int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen); - int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); + int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts]; - + //create coarse-grained partitioned broadcasts if( numParts > 1 ) { for( int i=0; i<numParts; i++ ) { @@ -557,60 +551,60 @@ public class SparkExecutionContext extends ExecutionContext else { //single partition ret[0] = getSparkContext().broadcast(pmb); } - + bret = new PartitionedBroadcast<MatrixBlock>(ret); - BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname, + BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname, OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics())); mo.setBroadcastHandle(bchandle); CacheableData.addBroadcastSize(bchandle.getSize()); } - + if (DMLScript.STATISTICS) { Statistics.accSparkBroadCastTime(System.nanoTime() - t0); Statistics.incSparkBroadcastCount(1); } - + return bret; } - + @SuppressWarnings("unchecked") - public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname) + public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname) throws DMLRuntimeException - { + { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; FrameObject fo = getFrameObject(varname); - + PartitionedBroadcast<FrameBlock> bret = null; - + //reuse existing broadcast handle - if( fo.getBroadcastHandle()!=null - && fo.getBroadcastHandle().isValid() ) + if( fo.getBroadcastHandle()!=null + && fo.getBroadcastHandle().isValid() ) { bret = fo.getBroadcastHandle().getBroadcast(); } - + //create new broadcast handle (never created, evicted) - if( bret == null ) + if( bret == null ) { //account for overwritten invalid broadcast (e.g., evicted) if( fo.getBroadcastHandle()!=null ) CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getSize()); - - //obtain meta data for frame + + //obtain meta data for frame int bclen = (int) fo.getNumColumns(); int brlen = OptimizerUtils.getDefaultFrameSize(); - + //create partitioned frame block and release memory consumed by input FrameBlock mb = fo.acquireRead(); PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<FrameBlock>(mb, brlen, bclen); fo.release(); - + //determine coarse-grained partitioning int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen); - int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); + int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts]; - + //create coarse-grained partitioned broadcasts if( numParts > 1 ) { for( int i=0; i<numParts; i++ ) { @@ -623,41 +617,41 @@ public class SparkExecutionContext extends ExecutionContext else { //single partition ret[0] = getSparkContext().broadcast(pmb); } - + bret = new PartitionedBroadcast<FrameBlock>(ret); - BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname, + BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname, OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics())); fo.setBroadcastHandle(bchandle); CacheableData.addBroadcastSize(bchandle.getSize()); } - + if (DMLScript.STATISTICS) { Statistics.accSparkBroadCastTime(System.nanoTime() - t0); Statistics.incSparkBroadcastCount(1); } - + return bret; } /** - * Keep the output rdd of spark rdd operations as meta data of matrix/frame + * Keep the output rdd of spark rdd operations as meta data of matrix/frame * objects in the symbol table. - * + * * @param varname variable name * @param rdd JavaPairRDD handle for variable * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) + public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) throws DMLRuntimeException { CacheableData<?> obj = getCacheableData(varname); RDDObject rddhandle = new RDDObject(rdd, varname); obj.setRDDHandle( rddhandle ); } - + /** * Utility method for creating an RDD out of an in-memory matrix block. - * + * * @param sc java spark context * @param src matrix block * @param brlen block row length @@ -665,13 +659,13 @@ public class SparkExecutionContext extends ExecutionContext * @return JavaPairRDD handle to matrix block * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) + public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) throws DMLRuntimeException - { + { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>(); - - if( src.getNumRows() <= brlen + + if( src.getNumRows() <= brlen && src.getNumColumns() <= bclen ) { list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1,1), src)); @@ -679,44 +673,44 @@ public class SparkExecutionContext extends ExecutionContext else { boolean sparse = src.isInSparseFormat(); - + //create and write subblocks of matrix for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++) for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++) { int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen; int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen; - + MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse); - + int row_offset = blockRow*brlen; int col_offset = blockCol*bclen; - + //copy submatrix to block - src.sliceOperations( row_offset, row_offset+maxRow-1, - col_offset, col_offset+maxCol-1, block ); - + src.sliceOperations( row_offset, row_offset+maxRow-1, + col_offset, col_offset+maxCol-1, block ); + //append block to sequence file MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1); list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(indexes, block)); } } - + JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list); if (DMLScript.STATISTICS) { Statistics.accSparkParallelizeTime(System.nanoTime() - t0); Statistics.incSparkParallelizeCount(1); } - + return result; } - public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) + public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) throws DMLRuntimeException - { + { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<Tuple2<Long,FrameBlock>>(); - + //create and write subblocks of matrix int blksize = ConfigurationManager.getBlocksize(); for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++) @@ -725,28 +719,28 @@ public class SparkExecutionContext extends ExecutionContext int roffset = blockRow*blksize; FrameBlock block = new FrameBlock(src.getSchema()); - + //copy sub frame to block, incl meta data on first - src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block ); + src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block ); if( roffset == 0 ) block.setColumnMetadata(src.getColumnMetadata()); - + //append block to sequence file list.addLast(new Tuple2<Long,FrameBlock>((long)roffset+1, block)); } - + JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list); if (DMLScript.STATISTICS) { Statistics.accSparkParallelizeTime(System.nanoTime() - t0); Statistics.incSparkParallelizeCount(1); } - + return result; } - + /** * This method is a generic abstraction for calls from the buffer pool. - * + * * @param rdd rdd object * @param rlen number of rows * @param clen number of columns @@ -757,21 +751,21 @@ public class SparkExecutionContext extends ExecutionContext * @throws DMLRuntimeException if DMLRuntimeException occurs */ @SuppressWarnings("unchecked") - public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) + public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException - { + { return toMatrixBlock( - (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), + (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), rlen, clen, brlen, bclen, nnz); } - + /** - * Utility method for creating a single matrix block out of a binary block RDD. - * Note that this collect call might trigger execution of any pending transformations. - * + * Utility method for creating a single matrix block out of a binary block RDD. + * Note that this collect call might trigger execution of any pending transformations. + * * NOTE: This is an unguarded utility function, which requires memory for both the output matrix * and its collected, blocked representation. - * + * * @param rdd JavaPairRDD for matrix block * @param rlen number of rows * @param clen number of columns @@ -781,19 +775,19 @@ public class SparkExecutionContext extends ExecutionContext * @return matrix block * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) + public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException { - + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixBlock out = null; - + if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK { //special case without copy and nnz maintenance List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); - + if( list.size()>1 ) throw new DMLRuntimeException("Expecting no more than one result block."); else if( list.size()==1 ) @@ -806,70 +800,70 @@ public class SparkExecutionContext extends ExecutionContext //determine target sparse/dense representation long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); - + //create output matrix block (w/ lazy allocation) out = new MatrixBlock(rlen, clen, sparse, lnnz); - + List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); - + //copy blocks one-at-a-time into output matrix block - long aNnz = 0; + long aNnz = 0; for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) { //unpack index-block pair MatrixIndexes ix = keyval._1(); MatrixBlock block = keyval._2(); - + //compute row/column block offsets int row_offset = (int)(ix.getRowIndex()-1)*brlen; int col_offset = (int)(ix.getColumnIndex()-1)*bclen; int rows = block.getNumRows(); int cols = block.getNumColumns(); - + //append block if( sparse ) { //SPARSE OUTPUT //append block to sparse target in order to avoid shifting, where //we use a shallow row copy in case of MCSR and single column blocks - //note: this append requires, for multiple column blocks, a final sort + //note: this append requires, for multiple column blocks, a final sort out.appendToSparse(block, row_offset, col_offset, clen>bclen); } else { //DENSE OUTPUT - out.copy( row_offset, row_offset+rows-1, - col_offset, col_offset+cols-1, block, false ); + out.copy( row_offset, row_offset+rows-1, + col_offset, col_offset+cols-1, block, false ); } - + //incremental maintenance nnz aNnz += block.getNonZeros(); } - + //post-processing output matrix if( sparse && clen>bclen ) out.sortSparseRows(); out.setNonZeros(aNnz); out.examSparsity(); } - + if (DMLScript.STATISTICS) { Statistics.accSparkCollectTime(System.nanoTime() - t0); Statistics.incSparkCollectCount(1); } - + return out; } - + @SuppressWarnings("unchecked") - public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) + public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) throws DMLRuntimeException - { + { return toMatrixBlock( - (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(), + (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(), rlen, clen, nnz); } - + /** - * Utility method for creating a single matrix block out of a binary cell RDD. - * Note that this collect call might trigger execution of any pending transformations. - * + * Utility method for creating a single matrix block out of a binary cell RDD. + * Note that this collect call might trigger execution of any pending transformations. + * * @param rdd JavaPairRDD for matrix block * @param rlen number of rows * @param clen number of columns @@ -877,57 +871,57 @@ public class SparkExecutionContext extends ExecutionContext * @return matrix block * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) + public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) throws DMLRuntimeException - { + { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixBlock out = null; - + //determine target sparse/dense representation long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); - + //create output matrix block (w/ lazy allocation) out = new MatrixBlock(rlen, clen, sparse); - + List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect(); - + //copy blocks one-at-a-time into output matrix block for( Tuple2<MatrixIndexes,MatrixCell> keyval : list ) { //unpack index-block pair MatrixIndexes ix = keyval._1(); MatrixCell cell = keyval._2(); - + //append cell to dense/sparse target in order to avoid shifting for sparse //note: this append requires a final sort of sparse rows out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue()); } - + //post-processing output matrix if( sparse ) out.sortSparseRows(); out.recomputeNonZeros(); out.examSparsity(); - + if (DMLScript.STATISTICS) { Statistics.accSparkCollectTime(System.nanoTime() - t0); Statistics.incSparkCollectCount(1); } - + return out; } - public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) + public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException { - + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen); List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); - + //copy blocks one-at-a-time into output matrix block for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) { @@ -936,24 +930,24 @@ public class SparkExecutionContext extends ExecutionContext MatrixBlock block = keyval._2(); out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); } - + if (DMLScript.STATISTICS) { Statistics.accSparkCollectTime(System.nanoTime() - t0); Statistics.incSparkCollectCount(1); } - + return out; } @SuppressWarnings("unchecked") - public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) - throws DMLRuntimeException + public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) + throws DMLRuntimeException { JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD(); return toFrameBlock(lrdd, schema, rlen, clen); } - public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) + public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -964,16 +958,16 @@ public class SparkExecutionContext extends ExecutionContext //create output frame block (w/ lazy allocation) FrameBlock out = new FrameBlock(schema); out.ensureAllocatedColumns(rlen); - + List<Tuple2<Long,FrameBlock>> list = rdd.collect(); - + //copy blocks one-at-a-time into output matrix block for( Tuple2<Long,FrameBlock> keyval : list ) { //unpack index-block pair int ix = (int)(keyval._1() - 1); FrameBlock block = keyval._2(); - + //copy into output frame out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block ); if( ix == 0 ) { @@ -981,12 +975,12 @@ public class SparkExecutionContext extends ExecutionContext out.setColumnMetadata(block.getColumnMetadata()); } } - + if (DMLScript.STATISTICS) { Statistics.accSparkCollectTime(System.nanoTime() - t0); Statistics.incSparkCollectCount(1); } - + return out; } @@ -994,17 +988,17 @@ public class SparkExecutionContext extends ExecutionContext public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo ) { JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(); - + //piggyback nnz maintenance on write LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz"); lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); - + //save file is an action which also triggers nnz maintenance - lrdd.saveAsHadoopFile(path, - oinfo.outputKeyClass, - oinfo.outputValueClass, + lrdd.saveAsHadoopFile(path, + oinfo.outputKeyClass, + oinfo.outputValueClass, oinfo.outputFormatClass); - + //return nnz aggregate of all blocks return aNnz.value(); } @@ -1013,58 +1007,58 @@ public class SparkExecutionContext extends ExecutionContext public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo ) { JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD(); - + //convert keys to writables if necessary if( oinfo == OutputInfo.BinaryBlockOutputInfo ) { lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair( new LongFrameToLongWritableFrameFunction()); oinfo = OutputInfo.BinaryBlockFrameOutputInfo; } - + //save file is an action which also triggers nnz maintenance - lrdd.saveAsHadoopFile(path, - oinfo.outputKeyClass, - oinfo.outputValueClass, + lrdd.saveAsHadoopFile(path, + oinfo.outputKeyClass, + oinfo.outputValueClass, oinfo.outputFormatClass); } - + /////////////////////////////////////////// // Cleanup of RDDs and Broadcast variables /////// - + /** * Adds a child rdd object to the lineage of a parent rdd. - * + * * @param varParent parent variable * @param varChild child variable * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void addLineageRDD(String varParent, String varChild) - throws DMLRuntimeException + public void addLineageRDD(String varParent, String varChild) + throws DMLRuntimeException { RDDObject parent = getCacheableData(varParent).getRDDHandle(); RDDObject child = getCacheableData(varChild).getRDDHandle(); - + parent.addLineageChild( child ); } - + /** * Adds a child broadcast object to the lineage of a parent rdd. - * + * * @param varParent parent variable * @param varChild child variable * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void addLineageBroadcast(String varParent, String varChild) - throws DMLRuntimeException + public void addLineageBroadcast(String varParent, String varChild) + throws DMLRuntimeException { RDDObject parent = getCacheableData(varParent).getRDDHandle(); BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle(); - + parent.addLineageChild( child ); } - public void addLineage(String varParent, String varChild, boolean broadcast) + public void addLineage(String varParent, String varChild, boolean broadcast) throws DMLRuntimeException { if( broadcast ) @@ -1072,25 +1066,25 @@ public class SparkExecutionContext extends ExecutionContext else addLineageRDD(varParent, varChild); } - + @Override - public void cleanupMatrixObject( MatrixObject mo ) + public void cleanupMatrixObject( MatrixObject mo ) throws DMLRuntimeException { //NOTE: this method overwrites the default behavior of cleanupMatrixObject //and hence is transparently used by rmvar instructions and other users. The //core difference is the lineage-based cleanup of RDD and broadcast variables. - + try { - if ( mo.isCleanupEnabled() ) + if ( mo.isCleanupEnabled() ) { //compute ref count only if matrix cleanup actually necessary - if ( !getVariables().hasReferences(mo) ) + if ( !getVariables().hasReferences(mo) ) { - //clean cached data - mo.clearData(); - + //clean cached data + mo.clearData(); + //clean hdfs data if no pending rdd operations on it if( mo.isHDFSFileExists() && mo.getFileName()!=null ) { if( mo.getRDDHandle()==null ) { @@ -1101,12 +1095,12 @@ public class SparkExecutionContext extends ExecutionContext rdd.setHDFSFilename(mo.getFileName()); } } - + //cleanup RDD and broadcast variables (recursive) //note: requires that mo.clearData already removed back references - if( mo.getRDDHandle()!=null ) { + if( mo.getRDDHandle()!=null ) { rCleanupLineageObject(mo.getRDDHandle()); - } + } if( mo.getBroadcastHandle()!=null ) { rCleanupLineageObject(mo.getBroadcastHandle()); } @@ -1120,18 +1114,18 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void rCleanupLineageObject(LineageObject lob) + private void rCleanupLineageObject(LineageObject lob) throws IOException - { + { //abort recursive cleanup if still consumers if( lob.getNumReferences() > 0 ) return; - - //abort if still reachable through matrix object (via back references for + + //abort if still reachable through matrix object (via back references for //robustness in function calls and to prevent repeated scans of the symbol table) if( lob.hasBackReference() ) return; - + //cleanup current lineage object (from driver/executors) //incl deferred hdfs file removal (only if metadata set by cleanup call) if( lob instanceof RDDObject ) { @@ -1151,38 +1145,38 @@ public class SparkExecutionContext extends ExecutionContext cleanupBroadcastVariable(bc); CacheableData.addBroadcastSize(-((BroadcastObject)lob).getSize()); } - + //recursively process lineage children for( LineageObject c : lob.getLineageChilds() ){ c.decrementNumReferences(); rCleanupLineageObject(c); } } - + /** * This call destroys a broadcast variable at all executors and the driver. * Hence, it is intended to be used on rmvar only. Depending on the * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not. - * + * * @param bvar broadcast variable */ - public static void cleanupBroadcastVariable(Broadcast<?> bvar) + public static void cleanupBroadcastVariable(Broadcast<?> bvar) { - //In comparison to 'unpersist' (which would only delete the broadcast + //In comparison to 'unpersist' (which would only delete the broadcast //from the executors), this call also deletes related data from the driver. if( bvar.isValid() ) { bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY ); } } - + /** * This call removes an rdd variable from executor memory and disk if required. * Hence, it is intended to be used on rmvar only. Depending on the * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not. - * + * * @param rvar rdd variable to remove */ - public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) + public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) { if( rvar.getStorageLevel()!=StorageLevel.NONE() ) { rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY ); @@ -1190,72 +1184,72 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public void repartitionAndCacheMatrixObject( String var ) + public void repartitionAndCacheMatrixObject( String var ) throws DMLRuntimeException { MatrixObject mo = getMatrixObject(var); MatrixCharacteristics mcIn = mo.getMatrixCharacteristics(); - + //double check size to avoid unnecessary spark context creation if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double) OptimizerUtils.estimateSizeExactSparsity(mcIn)) ) - return; - + return; + //get input rdd and default storage level - JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) + JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo); - + //avoid unnecessary caching of input in order to reduce memory pressure if( mo.getRDDHandle().allowsShortCircuitRead() && isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) { in = (JavaPairRDD<MatrixIndexes,MatrixBlock>) ((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD(); - + //investigate issue of unnecessarily large number of partitions int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in); if( numPartitions < in.getNumPartitions() ) in = in.coalesce( numPartitions ); } - - //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit + + //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit //executed on the original data, because there will be no merge, i.e., no key duplicates JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false); - + //convert mcsr into memory-efficient csr if potentially sparse - if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) { + if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) { out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); } - - //persist rdd in default storage level + + //persist rdd in default storage level out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL ) .count(); //trigger caching to prevent contention - + //create new rdd handle, in-place of current matrix object RDDObject inro = mo.getRDDHandle(); //guaranteed to exist (see above) RDDObject outro = new RDDObject(out, var); //create new rdd object outro.setCheckpointRDD(true); //mark as checkpointed outro.addLineageChild(inro); //keep lineage to prevent cycles on cleanup - mo.setRDDHandle(outro); + mo.setRDDHandle(outro); } @SuppressWarnings("unchecked") - public void cacheMatrixObject( String var ) + public void cacheMatrixObject( String var ) throws DMLRuntimeException { //get input rdd and default storage level MatrixObject mo = getMatrixObject(var); - + //double check size to avoid unnecessary spark context creation if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double) OptimizerUtils.estimateSizeExactSparsity(mo.getMatrixCharacteristics())) ) - return; - - JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) + return; + + JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo); - + //persist rdd (force rdd caching, if not already cached) if( !isRDDCached(in.id()) ) - in.count(); //trigger caching to prevent contention + in.count(); //trigger caching to prevent contention } public void setThreadLocalSchedulerPool(String poolName) { @@ -1283,7 +1277,7 @@ public class SparkExecutionContext extends ExecutionContext if( !jsc.sc().getPersistentRDDs().contains(rddID) ) { return false; } - + //check that rdd is actually already cached for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) { if( info.id() == rddID ) @@ -1293,35 +1287,35 @@ public class SparkExecutionContext extends ExecutionContext } /////////////////////////////////////////// - // Spark configuration handling + // Spark configuration handling /////// /** - * Obtains the lazily analyzed spark cluster configuration. - * + * Obtains the lazily analyzed spark cluster configuration. + * * @return spark cluster configuration */ public static SparkClusterConfig getSparkClusterConfig() { - //lazy creation of spark cluster config + //lazy creation of spark cluster config if( _sconf == null ) _sconf = new SparkClusterConfig(); return _sconf; } - + /** * Obtains the available memory budget for broadcast variables in bytes. - * + * * @return broadcast memory budget */ public static double getBroadcastMemoryBudget() { return getSparkClusterConfig() .getBroadcastMemoryBudget(); } - + /** * Obtain the available memory budget for data storage in bytes. - * - * @param min flag for minimum data budget + * + * @param min flag for minimum data budget * @param refresh flag for refresh with spark context * @return data memory budget */ @@ -1329,21 +1323,21 @@ public class SparkExecutionContext extends ExecutionContext return getSparkClusterConfig() .getDataMemoryBudget(min, refresh); } - + /** * Obtain the number of executors in the cluster (excluding the driver). - * + * * @return number of executors */ public static int getNumExecutors() { return getSparkClusterConfig() .getNumExecutors(); } - + /** - * Obtain the default degree of parallelism (cores in the cluster). - * - * @param refresh flag for refresh with spark context + * Obtain the default degree of parallelism (cores in the cluster). + * + * @param refresh flag for refresh with spark context * @return default degree of parallelism */ public static int getDefaultParallelism(boolean refresh) { @@ -1360,13 +1354,13 @@ public class SparkExecutionContext extends ExecutionContext int numExecutors = getNumExecutors(); int numCores = getDefaultParallelism(false); boolean multiThreaded = (numCores > numExecutors); - + //check for jdk version less than 8 (and raise warning if multi-threaded) - if( isLtJDK8 && multiThreaded) + if( isLtJDK8 && multiThreaded) { - //get the jre version + //get the jre version String version = System.getProperty("java.version"); - + LOG.warn("########################################################################################"); LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####"); LOG.warn("### java.version = " + version); @@ -1377,51 +1371,51 @@ public class SparkExecutionContext extends ExecutionContext LOG.warn("########################################################################################"); } } - + /** - * Captures relevant spark cluster configuration properties, e.g., memory budgets and + * Captures relevant spark cluster configuration properties, e.g., memory budgets and * degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current - * configurations and provides a unified view. + * configurations and provides a unified view. */ - private static class SparkClusterConfig + private static class SparkClusterConfig { //broadcasts are stored in mem-and-disk in data space, this config //defines the fraction of data space to be used as broadcast budget private static final double BROADCAST_DATA_FRACTION = 0.3; - + //forward private config from Spark's UnifiedMemoryManager.scala (>1.6) private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024; - + //meta configurations private boolean _legacyVersion = false; //spark version <1.6 private boolean _confOnly = false; //infrastructure info based on config - + //memory management configurations private long _memExecutor = -1; //mem per executor private double _memDataMinFrac = -1; //minimum data fraction private double _memDataMaxFrac = -1; //maximum data fraction private double _memBroadcastFrac = -1; //broadcast fraction - + //degree of parallelism configurations private int _numExecutors = -1; //total executors - private int _defaultPar = -1; //total vcores - - public SparkClusterConfig() + private int _defaultPar = -1; //total vcores + + public SparkClusterConfig() { SparkConf sconf = createSystemMLSparkConf(); _confOnly = true; - + //parse version and config String sparkVersion = getSparkVersionString(); _legacyVersion = (UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0 || sconf.getBoolean("spark.memory.useLegacyMode", false) ); - + //obtain basic spark configurations if( _legacyVersion ) analyzeSparkConfiguationLegacy(sconf); else analyzeSparkConfiguation(sconf); - + //log debug of created spark cluster config if( LOG.isDebugEnabled() ) LOG.debug( this.toString() ); @@ -1432,30 +1426,30 @@ public class SparkExecutionContext extends ExecutionContext } public long getDataMemoryBudget(boolean min, boolean refresh) { - //always get the current num executors on refresh because this might + //always get the current num executors on refresh because this might //change if not all executors are initially allocated and it is plan-relevant int numExec = _numExecutors; if( refresh && !_confOnly ) { JavaSparkContext jsc = getSparkContextStatic(); numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); } - + //compute data memory budget return (long) ( numExec * _memExecutor * - (min ? _memDataMinFrac : _memDataMaxFrac) ); + (min ? _memDataMinFrac : _memDataMaxFrac) ); } public int getNumExecutors() { if( _numExecutors < 0 ) - analyzeSparkParallelismConfiguation(null); + analyzeSparkParallelismConfiguation(null); return _numExecutors; } public int getDefaultParallelism(boolean refresh) { if( _defaultPar < 0 && !refresh ) analyzeSparkParallelismConfiguation(null); - - //always get the current default parallelism on refresh because this might + + //always get the current default parallelism on refresh because this might //change if not all executors are initially allocated and it is plan-relevant return ( refresh && !_confOnly ) ? getSparkContextStatic().defaultParallelism() : _defaultPar; @@ -1464,36 +1458,36 @@ public class SparkExecutionContext extends ExecutionContext public void analyzeSparkConfiguationLegacy(SparkConf conf) { //ensure allocated spark conf SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf; - + //parse absolute executor memory _memExecutor = UtilFunctions.parseMemorySize( sconf.get("spark.executor.memory", "1g")); - + //get data and shuffle memory ratios (defaults not specified in job conf) double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60% _memDataMinFrac = dataFrac; _memDataMaxFrac = dataFrac; _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18% - - //analyze spark degree of parallelism - analyzeSparkParallelismConfiguation(sconf); + + //analyze spark degree of parallelism + analyzeSparkParallelismConfiguation(sconf); } public void analyzeSparkConfiguation(SparkConf conf) { //ensure allocated spark conf SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf; - + //parse absolute executor memory, incl fixed cut off _memExecutor = UtilFunctions.parseMemorySize( - sconf.get("spark.executor.memory", "1g")) + sconf.get("spark.executor.memory", "1g")) - RESERVED_SYSTEM_MEMORY_BYTES; - + //get data and shuffle memory ratios (defaults not specified in job conf) _memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50% _memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.75); //default 75% _memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 22.5% - - //analyze spark degree of parallelism + + //analyze spark degree of parallelism analyzeSparkParallelismConfiguation(sconf); } @@ -1501,7 +1495,7 @@ public class SparkExecutionContext extends ExecutionContext int numExecutors = sconf.getInt("spark.executor.instances", -1); int numCoresPerExec = sconf.getInt("spark.executor.cores", -1); int defaultPar = sconf.getInt("spark.default.parallelism", -1); - + if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) { _numExecutors = numExecutors; _defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec; @@ -1512,28 +1506,28 @@ public class SparkExecutionContext extends ExecutionContext //note: spark context provides this information while conf does not //(for num executors we need to correct for driver and local mode) JavaSparkContext jsc = getSparkContextStatic(); - _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); + _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); _defaultPar = jsc.defaultParallelism(); - _confOnly &= false; //implies env info refresh w/ spark context + _confOnly &= false; //implies env info refresh w/ spark context } } - + /** * Obtains the spark version string. If the spark context has been created, - * we simply get it from the context; otherwise, we use Spark internal - * constants to avoid creating the spark context just for the version. - * + * we simply get it from the context; otherwise, we use Spark internal + * constants to avoid creating the spark context just for the version. + * * @return spark version string */ private String getSparkVersionString() { //check for existing spark context - if( isSparkContextCreated() ) + if( isSparkContextCreated() ) return getSparkContextStatic().version(); - + //use spark internal constant to avoid context creation return org.apache.spark.package$.MODULE$.SPARK_VERSION(); } - + @Override public String toString() { StringBuilder sb = new StringBuilder("SparkClusterConfig: \n"); @@ -1544,17 +1538,17 @@ public class SparkExecutionContext extends ExecutionContext sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n"); sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n"); sb.append("-- numExecutors = " + _numExecutors + "\n"); - sb.append("-- defaultPar = " + _defaultPar + "\n"); + sb.append("-- defaultPar = " + _defaultPar + "\n"); return sb.toString(); } } - - private static class MemoryManagerParRDDs + + private static class MemoryManagerParRDDs { private final long _limit; private long _size; private HashMap<Integer, Long> _rdds; - + public MemoryManagerParRDDs(double fractionMem) { _limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory()); _size = 0; @@ -1566,7 +1560,7 @@ public class SparkExecutionContext extends ExecutionContext _size += ret ? rddSize : 0; return ret; } - + public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) { if( !reserved ) { throw new RuntimeException("Unsupported rdd registration " @@ -1574,7 +1568,7 @@ public class SparkExecutionContext extends ExecutionContext } _rdds.put(rddID, rddSize); } - + public synchronized void deregisterRDD(int rddID) { long rddSize = _rdds.remove(rddID); _size -= rddSize;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java deleted file mode 100644 index a1173bd..0000000 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.functions; - -import java.io.Serializable; - -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Row; - -import scala.Tuple2; - -import org.apache.sysml.api.MLBlock; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; - -@SuppressWarnings("deprecation") -public class GetMLBlock implements Function<Tuple2<MatrixIndexes,MatrixBlock>, Row>, Serializable { - - private static final long serialVersionUID = 8829736765002126985L; - - @Override - public Row call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { - return new MLBlock(kv._1, kv._2); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index f206fbd..377ca2e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -63,14 +63,14 @@ import scala.Tuple2; * can be moved to RDDConverterUtils. */ @SuppressWarnings("unused") -public class RDDConverterUtilsExt +public class RDDConverterUtilsExt { public enum RDDConverterTypes { TEXT_TO_MATRIX_CELL, MATRIXENTRY_TO_MATRIXCELL } - - + + /** * Example usage: * <pre><code> @@ -88,7 +88,7 @@ public class RDDConverterUtilsExt * val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz) * val binBlocks = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), coordinateMatrix, mc, true) * </code></pre> - * + * * @param sc java spark context * @param input coordinate matrix * @param mcIn matrix characteristics @@ -97,26 +97,26 @@ public class RDDConverterUtilsExt * @throws DMLRuntimeException if DMLRuntimeException occurs */ public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(JavaSparkContext sc, - CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException + CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException { //convert matrix entry rdd to binary block rdd (w/ partial blocks) JavaPairRDD<MatrixIndexes, MatrixBlock> out = input.entries().toJavaRDD() .mapPartitionsToPair(new MatrixEntryToBinaryBlockFunction(mcIn)); - - //inject empty blocks (if necessary) + + //inject empty blocks (if necessary) if( outputEmptyBlocks && mcIn.mightHaveEmptyBlocks() ) { - out = out.union( + out = out.union( SparkUtils.getEmptyBlockRDD(sc, mcIn) ); } - + //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey(out, false); - + out = RDDAggregateUtils.mergeByKey(out, false); + return out; } - + public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(SparkContext sc, - CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException + CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException { return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), input, mcIn, true); } @@ -128,19 +128,19 @@ public class RDDConverterUtilsExt } return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); } - + public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen) throws DMLRuntimeException { return convertPy4JArrayToMB(data, (int)rlen, (int)clen, false); } - + public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen) throws DMLRuntimeException { return convertPy4JArrayToMB(data, rlen, clen, false); } - + public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, long rlen, long clen, long nnz) throws DMLRuntimeException { return convertSciPyCOOToMB(data, row, col, (int)rlen, (int)clen, (int)nnz); } - + public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, int rlen, int clen, int nnz) throws DMLRuntimeException { MatrixBlock mb = new MatrixBlock(rlen, clen, true); mb.allocateSparseRowsBlock(false); @@ -154,17 +154,17 @@ public class RDDConverterUtilsExt double val = buf1.getDouble(); int rowIndex = buf2.getInt(); int colIndex = buf3.getInt(); - mb.setValue(rowIndex, colIndex, val); + mb.setValue(rowIndex, colIndex, val); } mb.recomputeNonZeros(); mb.examSparsity(); return mb; } - + public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen, boolean isSparse) throws DMLRuntimeException { return convertPy4JArrayToMB(data, (int) rlen, (int) clen, isSparse); } - + public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) { MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse); ret.allocateDenseOrSparseBlock(); @@ -176,7 +176,7 @@ public class RDDConverterUtilsExt } return allocateDenseOrSparse(rlen, clen, isSparse); } - + public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws DMLRuntimeException { copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, (long)rlen, (long)clen); } @@ -192,12 +192,12 @@ public class RDDConverterUtilsExt ret.copy((int)(rowIndex*numRowsPerBlock), (int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, false); // } } - + public static void postProcessAfterCopying(MatrixBlock ret) throws DMLRuntimeException { ret.recomputeNonZeros(); ret.examSparsity(); } - + public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen, boolean isSparse) throws DMLRuntimeException { MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1); if(isSparse) { @@ -219,19 +219,19 @@ public class RDDConverterUtilsExt mb.examSparsity(); return mb; } - + public static byte [] convertMBtoPy4JDenseArr(MatrixBlock mb) throws DMLRuntimeException { byte [] ret = null; if(mb.isInSparseFormat()) { mb.sparseToDense(); } - + long limit = mb.getNumRows()*mb.getNumColumns(); int times = Double.SIZE / Byte.SIZE; if( limit > Integer.MAX_VALUE / times ) throw new DMLRuntimeException("MatrixBlock of size " + limit + " cannot be converted to dense numpy array"); ret = new byte[(int) (limit * times)]; - + double [] denseBlock = mb.getDenseBlock(); if(mb.isEmptyBlock()) { for(int i=0;i < limit;i++){ @@ -246,10 +246,10 @@ public class RDDConverterUtilsExt ByteBuffer.wrap(ret, i*times, times).order(ByteOrder.nativeOrder()).putDouble(denseBlock[i]); } } - + return ret; } - + public static class AddRowID implements Function<Tuple2<Row,Long>, Row> { private static final long serialVersionUID = -3733816995375745659L; @@ -263,12 +263,12 @@ public class RDDConverterUtilsExt fields[oldNumCols] = new Double(arg0._2 + 1); return RowFactory.create(fields); } - + } /** * Add element indices as new column to DataFrame - * + * * @param df input data frame * @param sparkSession the Spark Session * @param nameOfCol name of index column @@ -286,27 +286,10 @@ public class RDDConverterUtilsExt return sparkSession.createDataFrame(newRows, new StructType(newSchema)); } - /** - * Add element indices as new column to DataFrame - * - * @param df input data frame - * @param sqlContext the SQL Context - * @param nameOfCol name of index column - * @return new data frame - * - * @deprecated This will be removed in SystemML 1.0. - */ - @Deprecated - public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) { - SparkSession sparkSession = sqlContext.sparkSession(); - return addIDToDataFrame(df, sparkSession, nameOfCol); - } - - - private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> + private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> { private static final long serialVersionUID = 4907483236186747224L; - + private IJVToBinaryBlockFunctionHelper helper = null; public MatrixEntryToBinaryBlockFunction(MatrixCharacteristics mc) throws DMLRuntimeException { helper = new IJVToBinaryBlockFunctionHelper(mc); @@ -318,18 +301,18 @@ public class RDDConverterUtilsExt } } - + private static class IJVToBinaryBlockFunctionHelper implements Serializable { private static final long serialVersionUID = -7952801318564745821L; //internal buffer size (aligned w/ default matrix block size) private static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB) private int _bufflen = -1; - + private long _rlen = -1; private long _clen = -1; private int _brlen = -1; private int _bclen = -1; - + public IJVToBinaryBlockFunctionHelper(MatrixCharacteristics mc) throws DMLRuntimeException { if(!mc.dimsKnown()) { @@ -339,21 +322,21 @@ public class RDDConverterUtilsExt _clen = mc.getCols(); _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); - + //determine upper bounded buffer len _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE); - + } - + // ---------------------------------------------------- // Can extend this by having type hierarchy public Tuple2<MatrixIndexes, MatrixCell> textToMatrixCell(Text txt) { FastStringTokenizer st = new FastStringTokenizer(' '); //get input string (ignore matrix market comments) String strVal = txt.toString(); - if( strVal.startsWith("%") ) + if( strVal.startsWith("%") ) return null; - + //parse input ijv triple st.reset( strVal ); long row = st.nextLong(); @@ -363,19 +346,19 @@ public class RDDConverterUtilsExt MatrixCell cell = new MatrixCell(val); return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell); } - + public Tuple2<MatrixIndexes, MatrixCell> matrixEntryToMatrixCell(MatrixEntry entry) { MatrixIndexes indx = new MatrixIndexes(entry.i(), entry.j()); MatrixCell cell = new MatrixCell(entry.value()); return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell); } - + // ---------------------------------------------------- - + Iterable<Tuple2<MatrixIndexes, MatrixBlock>> convertToBinaryBlock(Object arg0, RDDConverterTypes converter) throws Exception { ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); ReblockBuffer rbuff = new ReblockBuffer(_bufflen, _rlen, _clen, _brlen, _bclen); - + Iterator<?> iter = (Iterator<?>) arg0; while( iter.hasNext() ) { Tuple2<MatrixIndexes, MatrixCell> cell = null; @@ -383,38 +366,38 @@ public class RDDConverterUtilsExt case MATRIXENTRY_TO_MATRIXCELL: cell = matrixEntryToMatrixCell((MatrixEntry) iter.next()); break; - + case TEXT_TO_MATRIX_CELL: cell = textToMatrixCell((Text) iter.next()); break; - + default: throw new Exception("Invalid converter for IJV data:" + converter.toString()); } - + if(cell == null) { continue; } - + //flush buffer if necessary if( rbuff.getSize() >= rbuff.getCapacity() ) flushBufferToList(rbuff, ret); - + //add value to reblock buffer rbuff.appendCell(cell._1.getRowIndex(), cell._1.getColumnIndex(), cell._2.getValue()); } - + //final flush buffer flushBufferToList(rbuff, ret); - + return ret; } - private void flushBufferToList( ReblockBuffer rbuff, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) + private void flushBufferToList( ReblockBuffer rbuff, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) throws IOException, DMLRuntimeException { //temporary list of indexed matrix values to prevent library dependencies - ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>(); + ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>(); rbuff.flushBufferToBinaryBlocks(rettmp); ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp)); } @@ -423,50 +406,17 @@ public class RDDConverterUtilsExt /** * Convert a dataframe of comma-separated string rows to a dataframe of * ml.linalg.Vector rows. - * - * <p> - * Example input rows:<br> - * - * <code> - * ((1.2, 4.3, 3.4))<br> - * (1.2, 3.4, 2.2)<br> - * [[1.2, 34.3, 1.2, 1.25]]<br> - * [1.2, 3.4]<br> - * </code> - * - * @param sqlContext - * Spark SQL Context - * @param inputDF - * dataframe of comma-separated row strings to convert to - * dataframe of ml.linalg.Vector rows - * @return dataframe of ml.linalg.Vector rows - * @throws DMLRuntimeException - * if DMLRuntimeException occurs - * - * @deprecated This will be removed in SystemML 1.0. Please migrate to {@code - * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, Dataset<Row>) } - */ - @Deprecated - public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext sqlContext, Dataset<Row> inputDF) - throws DMLRuntimeException { - SparkSession sparkSession = sqlContext.sparkSession(); - return stringDataFrameToVectorDataFrame(sparkSession, inputDF); - } - - /** - * Convert a dataframe of comma-separated string rows to a dataframe of - * ml.linalg.Vector rows. - * + * * <p> * Example input rows:<br> - * + * * <code> * ((1.2, 4.3, 3.4))<br> * (1.2, 3.4, 2.2)<br> * [[1.2, 34.3, 1.2, 1.25]]<br> * [1.2, 3.4]<br> * </code> - * + * * @param sparkSession * Spark Session * @param inputDF
