This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9641173dd54ba9d43e4869483006bfc4fc66897c
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Nov 28 23:47:53 2020 +0100

    [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames
    
    This patch fixes the spark reblock instructions (always compiled in
    hybrid mode), which incorrectly consolidate federated matrices/frames
    into the driver. We now simply extended the implementation to respect
    existing federated data objects.
---
 .../apache/sysds/hops/recompile/Recompiler.java    | 46 +++++++++-------------
 .../spark/CSVReblockSPInstruction.java             |  6 +--
 .../instructions/spark/ReblockSPInstruction.java   |  6 +--
 3 files changed, 23 insertions(+), 35 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c785cfc..6e960e7 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -68,6 +68,7 @@ import 
org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -80,7 +81,6 @@ import 
org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1568,33 +1568,25 @@ public class Recompiler
                        && 
!OptimizerUtils.exceedsCachingThreshold(dc.getCols(), 
OptimizerUtils.estimateSize(dc));
        }
        
-       public static void executeInMemoryMatrixReblock(ExecutionContext ec, 
String varin, String varout) {
-               MatrixObject in = ec.getMatrixObject(varin);
-               MatrixObject out = ec.getMatrixObject(varout);
+       @SuppressWarnings("unchecked")
+       public static void executeInMemoryReblock(ExecutionContext ec, String 
varin, String varout) {
+               CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) 
ec.getCacheableData(varin);
+               CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) 
ec.getCacheableData(varout);
 
-               //read text input matrix (through buffer pool, matrix object 
carries all relevant
-               //information including additional arguments for csv reblock)
-               MatrixBlock mb = in.acquireRead(); 
-               
-               //set output (incl update matrix characteristics)
-               out.acquireModify( mb );
-               out.release();
-               in.release();
-       }
-       
-       public static void executeInMemoryFrameReblock(ExecutionContext ec, 
String varin, String varout) 
-       {
-               FrameObject in = ec.getFrameObject(varin);
-               FrameObject out = ec.getFrameObject(varout);
-
-               //read text input frame (through buffer pool, frame object 
carries all relevant
-               //information including additional arguments for csv reblock)
-               FrameBlock fb = in.acquireRead(); 
-               
-               //set output (incl update matrix characteristics)
-               out.acquireModify( fb );
-               out.release();
-               in.release();
+               if( in.isFederated() ) {
+                       out.setMetaData(in.getMetaData());
+                       out.setFedMapping(in.getFedMapping());
+               }
+               else {
+                       //read text input matrix (through buffer pool, matrix 
object carries all relevant
+                       //information including additional arguments for csv 
reblock)
+                       CacheBlock mb = in.acquireRead();
+                       
+                       //set output (incl update matrix characteristics)
+                       out.acquireModify(mb);
+                       out.release();
+                       in.release();
+               }
        }
        
        private static void tryReadMetaDataFileDataCharacteristics( DataOp dop )
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index d073a3c..be4adc3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -114,10 +114,8 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction {
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       if( input1.getDataType() == DataType.MATRIX )
-                               Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
-                       else if( input1.getDataType() == DataType.FRAME )
-                               Recompiler.executeInMemoryFrameReblock(sec, 
input1.getName(), output.getName());
+                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() )
+                               Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
                        Statistics.decrementNoOfExecutedSPInst();
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 46ab52e..a27a760 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -97,10 +97,8 @@ public class ReblockSPInstruction extends UnarySPInstruction 
{
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       if( input1.getDataType() == DataType.MATRIX )
-                               Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
-                       else if( input1.getDataType() == DataType.FRAME )
-                               Recompiler.executeInMemoryFrameReblock(sec, 
input1.getName(), output.getName());
+                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() )
+                               Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
                        Statistics.decrementNoOfExecutedSPInst();
                        return;
                }

Reply via email to