This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 9641173dd54ba9d43e4869483006bfc4fc66897c Author: Matthias Boehm <[email protected]> AuthorDate: Sat Nov 28 23:47:53 2020 +0100 [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames This patch fixes the spark reblock instructions (always compiled in hybrid mode), which incorrectly consolidate federated matrices/frames into the driver. We now simply extended the implementation to respect existing federated data objects. --- .../apache/sysds/hops/recompile/Recompiler.java | 46 +++++++++------------- .../spark/CSVReblockSPInstruction.java | 6 +-- .../instructions/spark/ReblockSPInstruction.java | 6 +-- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java index c785cfc..6e960e7 100644 --- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java @@ -68,6 +68,7 @@ import org.apache.sysds.runtime.controlprogram.LocalVariableMap; import org.apache.sysds.runtime.controlprogram.ParForProgramBlock; import org.apache.sysds.runtime.controlprogram.ProgramBlock; import org.apache.sysds.runtime.controlprogram.WhileProgramBlock; +import org.apache.sysds.runtime.controlprogram.caching.CacheBlock; import org.apache.sysds.runtime.controlprogram.caching.CacheableData; import org.apache.sysds.runtime.controlprogram.caching.FrameObject; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -80,7 +81,6 @@ import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction; import org.apache.sysds.runtime.instructions.cp.IntObject; import org.apache.sysds.runtime.instructions.cp.ScalarObject; import org.apache.sysds.runtime.io.IOUtilFunctions; -import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.meta.MatrixCharacteristics; @@ -1568,33 +1568,25 @@ public class Recompiler && !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc)); } - public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) { - MatrixObject in = ec.getMatrixObject(varin); - MatrixObject out = ec.getMatrixObject(varout); + @SuppressWarnings("unchecked") + public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) { + CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin); + CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout); - //read text input matrix (through buffer pool, matrix object carries all relevant - //information including additional arguments for csv reblock) - MatrixBlock mb = in.acquireRead(); - - //set output (incl update matrix characteristics) - out.acquireModify( mb ); - out.release(); - in.release(); - } - - public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout) - { - FrameObject in = ec.getFrameObject(varin); - FrameObject out = ec.getFrameObject(varout); - - //read text input frame (through buffer pool, frame object carries all relevant - //information including additional arguments for csv reblock) - FrameBlock fb = in.acquireRead(); - - //set output (incl update matrix characteristics) - out.acquireModify( fb ); - out.release(); - in.release(); + if( in.isFederated() ) { + out.setMetaData(in.getMetaData()); + out.setFedMapping(in.getFedMapping()); + } + else { + //read text input matrix (through buffer pool, matrix object carries all relevant + //information including additional arguments for csv reblock) + CacheBlock mb = in.acquireRead(); + + //set output (incl update matrix characteristics) + out.acquireModify(mb); + out.release(); + in.release(); + } } private static void tryReadMetaDataFileDataCharacteristics( DataOp dop ) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java index d073a3c..be4adc3 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java @@ -114,10 +114,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction { //check for in-memory reblock (w/ lazy spark context, potential for latency reduction) if( Recompiler.checkCPReblock(sec, input1.getName()) ) { - if( input1.getDataType() == DataType.MATRIX ) - Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); - else if( input1.getDataType() == DataType.FRAME ) - Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName()); + if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) + Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName()); Statistics.decrementNoOfExecutedSPInst(); return; } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java index 46ab52e..a27a760 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java @@ -97,10 +97,8 @@ public class ReblockSPInstruction extends UnarySPInstruction { //check for in-memory reblock (w/ lazy spark context, potential for latency reduction) if( Recompiler.checkCPReblock(sec, input1.getName()) ) { - if( input1.getDataType() == DataType.MATRIX ) - Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); - else if( input1.getDataType() == DataType.FRAME ) - Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName()); + if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) + Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName()); Statistics.decrementNoOfExecutedSPInst(); return; }
