This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dedddc  [SYSTEMDS-2733] Fix federated append, right-indexing, 
as.frame, tests
4dedddc is described below

commit 4dedddcbf626799d91507a84987ee7f9c9e6a2bb
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun Nov 15 19:12:40 2020 +0100

    [SYSTEMDS-2733] Fix federated append, right-indexing, as.frame, tests
    
    This patch fixes various issues of federated instructions and meta data
    handling in order to allow full federated ML pipelines.
    
    * Append: fix for missing federated append (retain row-partitioning),
    correct federated ranges, and only partial broadcasting
    * Right Indexing: fix for correct federated ranges, and only indexing
    for relevant ranges
    * Transformencode: fix for consistent recode maps, and ranges
    * Matrix-Frame casts: fix for consistent dimension meta data
    * Tests: more reasonable fed worker startup time, configurable constant,
    fixed output buffering for tests that process the buffer
---
 .../sysds/runtime/controlprogram/ProgramBlock.java |  30 ++-
 .../controlprogram/federated/FederationMap.java    |  30 ++-
 .../controlprogram/federated/FederationUtils.java  |   2 +-
 .../fed/AggregateBinaryFEDInstruction.java         |   1 -
 .../instructions/fed/AppendFEDInstruction.java     |  72 ++++---
 .../instructions/fed/FEDInstructionUtils.java      |   3 +-
 .../fed/MatrixIndexingFEDInstruction.java          |  99 ++++------
 ...tiReturnParameterizedBuiltinFEDInstruction.java |  10 +-
 .../instructions/fed/VariableFEDInstruction.java   |   7 +-
 .../instructions/spark/BinarySPInstruction.java    |  14 +-
 .../apache/sysds/runtime/meta/MetaDataUtils.java   |  39 ++++
 .../runtime/transform/encode/EncoderRecode.java    |  18 ++
 .../org/apache/sysds/test/AutomatedTestBase.java   |   2 +
 .../federated/algorithms/FederatedBivarTest.java   |   6 +-
 .../federated/algorithms/FederatedCorTest.java     |   6 +-
 .../federated/algorithms/FederatedGLMTest.java     |   2 +-
 .../federated/algorithms/FederatedKmeansTest.java  |   2 +-
 .../federated/algorithms/FederatedL2SVMTest.java   |   2 +-
 .../federated/algorithms/FederatedLmPipeline.java  | 116 ++++++++++++
 .../federated/algorithms/FederatedLogRegTest.java  |   2 +-
 .../federated/algorithms/FederatedPCATest.java     |   6 +-
 .../federated/algorithms/FederatedUnivarTest.java  |   6 +-
 .../federated/algorithms/FederatedVarTest.java     |   6 +-
 .../federated/algorithms/FederatedYL2SVMTest.java  |   2 +-
 .../federated/io/FederatedReaderTest.java          |   2 +-
 .../functions/federated/io/FederatedSSLTest.java   |   2 +-
 .../federated/io/FederatedWriterTest.java          |   2 +-
 .../paramserv/FederatedParamservTest.java          |   2 +-
 .../primitives/FederatedBinaryMatrixTest.java      |   2 +-
 .../primitives/FederatedBinaryVectorTest.java      |   2 +-
 .../primitives/FederatedCastToFrameTest.java       |   5 +-
 .../primitives/FederatedCastToMatrixTest.java      |   5 +-
 .../primitives/FederatedCentralMomentTest.java     | 206 ++++++++++-----------
 .../primitives/FederatedColAggregateTest.java      |   6 +-
 .../primitives/FederatedFullAggregateTest.java     |   6 +-
 .../primitives/FederatedMultiplyTest.java          |   2 +-
 .../federated/primitives/FederatedRCBindTest.java  |   4 +-
 .../primitives/FederatedRemoveEmptyTest.java       |   6 +-
 .../primitives/FederatedRightIndexTest.java        |   6 +-
 .../primitives/FederatedRowAggregateTest.java      |   6 +-
 .../federated/primitives/FederatedSplitTest.java   | 187 ++++++++++---------
 .../primitives/FederatedStatisticsTest.java        |   2 +-
 .../TransformFederatedEncodeApplyTest.java         |   6 +-
 .../TransformFederatedEncodeDecodeTest.java        |   6 +-
 .../functions/federated/FederatedLmPipeline.dml    |  65 +++++++
 .../federated/FederatedLmPipelineReference.dml     |  64 +++++++
 46 files changed, 712 insertions(+), 365 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
index bb34bf6..8bfdcc7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ProgramBlock.java
@@ -32,6 +32,7 @@ import org.apache.sysds.parser.ParseInfo;
 import org.apache.sysds.parser.StatementBlock;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.DMLScriptException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -56,7 +57,7 @@ public abstract class ProgramBlock implements ParseInfo
        public static final String PRED_VAR = "__pred";
        
        protected static final Log LOG = 
LogFactory.getLog(ProgramBlock.class.getName());
-       private static final boolean CHECK_MATRIX_SPARSITY = false;
+       private static final boolean CHECK_MATRIX_PROPERTIES = false;
 
        protected Program _prog; // pointer to Program this ProgramBlock is 
part of
        
@@ -273,8 +274,9 @@ public abstract class ProgramBlock implements ParseInfo
 
                        // optional check for correct nnz and sparse/dense 
representation of all
                        // variables in symbol table (for tracking source of 
wrong representation)
-                       if( CHECK_MATRIX_SPARSITY ) {
+                       if( CHECK_MATRIX_PROPERTIES ) {
                                checkSparsity( tmp, ec.getVariables() );
+                               checkFederated( tmp, ec.getVariables() );
                        }
                }
                catch (DMLScriptException e){
@@ -332,14 +334,11 @@ public abstract class ProgramBlock implements ParseInfo
        
        private static void checkSparsity( Instruction lastInst, 
LocalVariableMap vars )
        {
-               for( String varname : vars.keySet() )
-               {
+               for( String varname : vars.keySet() ) {
                        Data dat = vars.get(varname);
-                       if( dat instanceof MatrixObject )
-                       {
+                       if( dat instanceof MatrixObject ) {
                                MatrixObject mo = (MatrixObject)dat;
-                               if( mo.isDirty() && !mo.isPartitioned() )
-                               {
+                               if( mo.isDirty() && !mo.isPartitioned() ) {
                                        MatrixBlock mb = mo.acquireRead();
                                        boolean sparse1 = mb.isInSparseFormat();
                                        long nnz1 = mb.getNonZeros();
@@ -368,6 +367,21 @@ public abstract class ProgramBlock implements ParseInfo
                }
        }
 
+       private static void checkFederated( Instruction lastInst, 
LocalVariableMap vars )
+       {
+               for( String varname : vars.keySet() ) {
+                       Data dat = vars.get(varname);
+                       if( !(dat instanceof CacheableData) )
+                               continue;
+                       
+                       CacheableData<?> mo = (CacheableData<?>)dat;
+                       if( mo.isFederated() ) {
+                               if( 
mo.getFedMapping().getFedMapping().isEmpty() )
+                                       throw new DMLRuntimeException("Invalid 
empty FederationMap for: "+mo);
+                       }
+               }
+       }
+       
        
///////////////////////////////////////////////////////////////////////////
        // store position information for program blocks
        
///////////////////////////////////////////////////////////////////////////
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index f670c17..36e82a3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.controlprogram.federated;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -40,6 +41,7 @@ import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.IndexRange;
 
 public class FederationMap
 {
@@ -83,6 +85,10 @@ public class FederationMap
                _type = type;
        }
        
+       public int getSize() {
+               return _fedMap.size();
+       }
+       
        public FederatedRange[] getFederatedRanges() {
                return _fedMap.keySet().toArray(new FederatedRange[0]);
        }
@@ -254,7 +260,7 @@ public class FederationMap
                //TODO handling of file path, but no danger as never written
                for( Entry<FederatedRange, FederatedData> e : 
_fedMap.entrySet() )
                        map.put(new FederatedRange(e.getKey(), clen), 
e.getValue().copyWithNewID(id));
-               return new FederationMap(id, map);
+               return new FederationMap(id, map, _type);
        }
 
        public FederationMap bind(long rOffset, long cOffset, FederationMap 
that) {
@@ -324,6 +330,23 @@ public class FederationMap
                return fedMapCopy;
        }
        
+       public FederationMap filter(IndexRange ixrange) {
+               FederationMap ret = this.clone(); //same ID
+               
+               Iterator<Entry<FederatedRange, FederatedData>> iter = 
ret._fedMap.entrySet().iterator();
+               while( iter.hasNext() ) {
+                       Entry<FederatedRange, FederatedData> e = iter.next();
+                       FederatedRange range = e.getKey();
+                       long rs = range.getBeginDims()[0], re = 
range.getEndDims()[0],
+                               cs = range.getBeginDims()[1], ce = 
range.getEndDims()[1];
+                       boolean overlap = ((ixrange.colStart <= ce) && 
(ixrange.colEnd >= cs)
+                               && (ixrange.rowStart <= re) && (ixrange.rowEnd 
>= rs));
+                       if( !overlap )
+                               iter.remove();
+               }
+               return ret;
+       }
+       
        private static void setThreadID(long tid, FederatedRequest[]... frsets) 
{
                for( FederatedRequest[] frset : frsets )
                        if( frset != null )
@@ -360,4 +383,9 @@ public class FederationMap
                sb.append("\n"+ _fedMap);
                return sb.toString();
        }
+       
+       @Override
+       public FederationMap clone() {
+               return copyWithNewID(getID());
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 22f4e69..093ff30 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -347,7 +347,7 @@ public class FederationUtils {
                        throw new DMLRuntimeException("Unsupported aggregation 
operator: "
                                + 
aop.aggOp.increOp.fn.getClass().getSimpleName());
        }
-
+       
        public static FederationMap federateLocalData(CacheableData<?> data) {
                long id = FederationUtils.getNextFedDataID();
                FederatedLocalData federatedLocalData = new 
FederatedLocalData(id, data);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index bdb9784..9107a86 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -94,7 +94,6 @@ public class AggregateBinaryFEDInstruction extends 
BinaryFEDInstruction {
                                MatrixObject out = ec.getMatrixObject(output);
                                
out.getDataCharacteristics().set(mo1.getNumRows(), mo2.getNumColumns(), 
(int)mo1.getBlocksize());
                                
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr2.getID(), 
mo2.getNumColumns()));
-                               out.getFedMapping().setType(FType.ROW);
                        }
                }
                //#2 vector - federated matrix multiplication
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
index ee0d8aa..67425f1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AppendFEDInstruction.java
@@ -22,7 +22,9 @@ package org.apache.sysds.runtime.instructions.fed;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.functionobjects.OffsetColumnIndex;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -30,6 +32,7 @@ import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataUtils;
 
 public class AppendFEDInstruction extends BinaryFEDInstruction {
        protected boolean _cbind; // otherwise rbind
@@ -60,7 +63,7 @@ public class AppendFEDInstruction extends 
BinaryFEDInstruction {
                MatrixObject mo1 = ec.getMatrixObject(input1.getName());
                MatrixObject mo2 = ec.getMatrixObject(input2.getName());
                DataCharacteristics dc1 = mo1.getDataCharacteristics();
-               DataCharacteristics dc2 = mo1.getDataCharacteristics();
+               DataCharacteristics dc2 = mo2.getDataCharacteristics();
 
                // check input dimensions
                if(_cbind && mo1.getNumRows() != mo2.getNumRows()) {
@@ -80,32 +83,53 @@ public class AppendFEDInstruction extends 
BinaryFEDInstruction {
                        throw new DMLRuntimeException(sb.toString());
                }
 
-               FederationMap fm1;
-               if(mo1.isFederated())
-                       fm1 = mo1.getFedMapping();
-               else
-                       fm1 = FederationUtils.federateLocalData(mo1);
-               FederationMap fm2;
-               if(mo2.isFederated())
-                       fm2 = mo2.getFedMapping();
-               else
-                       fm2 = FederationUtils.federateLocalData(mo2);
-
+               //prepare output
                MatrixObject out = ec.getMatrixObject(output);
-               long id = FederationUtils.getNextFedDataID();
-               if(_cbind) {
-                       out.getDataCharacteristics().set(dc1.getRows(),
-                               dc1.getCols() + dc2.getCols(),
-                               dc1.getBlocksize(),
-                               dc1.getNonZeros() + dc2.getNonZeros());
-                       out.setFedMapping(fm1.identCopy(getTID(), id).bind(0, 
dc1.getCols(), fm2.identCopy(getTID(), id)));
+               MetaDataUtils.updateAppendDataCharacteristics(dc1, dc2, 
out.getDataCharacteristics(), _cbind);
+               
+               // federated/federated
+               if( mo1.isFederated() && mo2.isFederated() 
+                       && 
mo1.getFedMapping().getType()==mo2.getFedMapping().getType() ) 
+               {
+                       long id = FederationUtils.getNextFedDataID();
+                       long roff = _cbind ? 0 : dc1.getRows();
+                       long coff = _cbind ? dc1.getCols() : 0;
+                       
out.setFedMapping(mo1.getFedMapping().identCopy(getTID(), id)
+                               .bind(roff, coff, 
mo2.getFedMapping().identCopy(getTID(), id)));
+               }
+               // federated/local, local/federated cbind
+               else if( (mo1.isFederated(FType.ROW) || 
mo2.isFederated(FType.ROW)) && _cbind ) {
+                       MatrixObject moFed = mo1.isFederated(FType.ROW) ? mo1 : 
mo2;
+                       MatrixObject moLoc = mo1.isFederated(FType.ROW) ? mo2 : 
mo1;
+                       
+                       //construct commands: broadcast lhs, fed append, clean 
broadcast
+                       FederatedRequest[] fr1 = 
moFed.getFedMapping().broadcastSliced(moLoc, false);
+                       FederatedRequest fr2 = 
FederationUtils.callInstruction(instString, output,
+                               new CPOperand[]{input1, input2}, 
mo1.isFederated(FType.ROW) ?
+                               new long[]{ moFed.getFedMapping().getID(), 
fr1[0].getID()} :
+                               new long[]{ fr1[0].getID(), 
moFed.getFedMapping().getID()});
+                       FederatedRequest fr3 = 
moFed.getFedMapping().cleanup(getTID(), fr1[0].getID());
+                       
+                       //execute federated operations and set output
+                       moFed.getFedMapping().execute(getTID(), true, fr1, fr2, 
fr3);
+                       
out.setFedMapping(moFed.getFedMapping().copyWithNewID(fr2.getID(), 
out.getNumColumns()));
+               }
+               // federated/local, local/federated rbind
+               else if( (mo1.isFederated(FType.ROW) || 
mo2.isFederated(FType.ROW)) && !_cbind) {
+                       long id = FederationUtils.getNextFedDataID();
+                       long roff = _cbind ? 0 : dc1.getRows();
+                       long coff = _cbind ? dc1.getCols() : 0;
+                       FederationMap fed1 = mo1.isFederated(FType.ROW) ?
+                               mo1.getFedMapping() : 
FederationUtils.federateLocalData(mo1);
+                       FederationMap fed2 = mo2.isFederated(FType.ROW) ?
+                               mo2.getFedMapping() : 
FederationUtils.federateLocalData(mo2);
+                       out.setFedMapping(fed1.identCopy(getTID(), id)
+                               .bind(roff, coff, fed2.identCopy(getTID(), 
id)));
                }
                else {
-                       out.getDataCharacteristics().set(dc1.getRows() + 
dc2.getRows(),
-                               dc1.getCols(),
-                               dc1.getBlocksize(),
-                               dc1.getNonZeros() + dc2.getNonZeros());
-                       out.setFedMapping(fm1.identCopy(getTID(), 
id).bind(dc1.getRows(), 0, fm2.identCopy(getTID(), id)));
+                       throw new DMLRuntimeException("Unsupported federated 
append: "
+                               + (mo1.isFederated() ? 
mo1.getFedMapping().getType().name():"LOCAL") + " "
+                               + (mo2.isFederated() ? 
mo2.getFedMapping().getType().name():"LOCAL") + " " + _cbind);
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index d8af245..c0481ab 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -119,7 +119,7 @@ public class FEDInstructionUtils {
                        BinaryCPInstruction instruction = (BinaryCPInstruction) 
inst;
                        if( (instruction.input1.isMatrix() && 
ec.getMatrixObject(instruction.input1).isFederated())
                                || (instruction.input2.isMatrix() && 
ec.getMatrixObject(instruction.input2).isFederated()) ) {
-                               if(instruction.getOpcode().equals("append"))
+                               if(instruction.getOpcode().equals("append") )
                                        fedinst = 
AppendFEDInstruction.parseInstruction(inst.getInstructionString());
                                else if(instruction.getOpcode().equals("qpick"))
                                        fedinst = 
QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
@@ -149,7 +149,6 @@ public class FEDInstructionUtils {
                }
                else if(inst instanceof MatrixIndexingCPInstruction) {
                        // matrix indexing
-                       LOG.info("Federated Indexing");
                        MatrixIndexingCPInstruction minst = 
(MatrixIndexingCPInstruction) inst;
                        if(inst.getOpcode().equalsIgnoreCase("rightIndex")
                                && minst.input1.isMatrix() && 
ec.getCacheableData(minst.input1).isFederated()) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
index 5c0a821..477379c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
@@ -50,80 +50,65 @@ public final class MatrixIndexingFEDInstruction extends 
IndexingFEDInstruction {
                rightIndexing(ec);
        }
 
-       private void rightIndexing(ExecutionContext ec) {
+       private void rightIndexing(ExecutionContext ec)
+       {
+               //get input and requested index range
                MatrixObject in = ec.getMatrixObject(input1);
-               FederationMap fedMapping = in.getFedMapping();
                IndexRange ixrange = getIndexRange(ec);
-               // FederationMap.FType fedType;
+               
+               //prepare output federation map (copy-on-write)
+               FederationMap fedMap = in.getFedMapping().filter(ixrange);
+               
+               //modify federated ranges in place
                Map<FederatedRange, IndexRange> ixs = new HashMap<>();
-
-               for(int i = 0; i < fedMapping.getFederatedRanges().length; i++) 
{
-                       FederatedRange curFedRange = 
fedMapping.getFederatedRanges()[i];
-                       long rs = curFedRange.getBeginDims()[0], re = 
curFedRange.getEndDims()[0],
-                               cs = curFedRange.getBeginDims()[1], ce = 
curFedRange.getEndDims()[1];
-
-                       if((ixrange.colStart <= ce) && (ixrange.colEnd >= cs) 
&& (ixrange.rowStart <= re) && (ixrange.rowEnd >= rs)) {
-                               // If the indexing range contains values that 
are within the specific federated range.
-                               // change the range.
-                               long rsn = (ixrange.rowStart >= rs) ? 
(ixrange.rowStart - rs) : 0;
-                               long ren = (ixrange.rowEnd >= rs && 
ixrange.rowEnd < re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
-                               long csn = (ixrange.colStart >= cs) ? 
(ixrange.colStart - cs) : 0;
-                               long cen = (ixrange.colEnd >= cs && 
ixrange.colEnd < ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Ranges for fed location: " + 
rsn + " " + ren + " " + csn + " " + cen);
-                                       LOG.debug("ixRange                : " + 
ixrange);
-                                       LOG.debug("Fed Mapping            : " + 
curFedRange);
-                               }
-                               curFedRange.setBeginDim(0, Math.max(rs - 
ixrange.rowStart, 0));
-                               curFedRange.setBeginDim(1, Math.max(cs - 
ixrange.colStart, 0));
-                               curFedRange.setEndDim(0,
-                                       (ixrange.rowEnd >= re ? re - 
ixrange.rowStart : ixrange.rowEnd - ixrange.rowStart + 1));
-                               curFedRange.setEndDim(1,
-                                       (ixrange.colEnd >= ce ? ce - 
ixrange.colStart : ixrange.colEnd - ixrange.colStart + 1));
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Fed Mapping After      : " + 
curFedRange);
-                               }
-                               ixs.put(curFedRange, new IndexRange(rsn, ren, 
csn, cen));
-                       }
-                       else {
-                               // If not within the range, change the range to 
become an 0 times 0 big range.
-                               // by setting the end dimensions to the same as 
the beginning dimensions.
-                               curFedRange.setBeginDim(0, 0);
-                               curFedRange.setBeginDim(1, 0);
-                               curFedRange.setEndDim(0, 0);
-                               curFedRange.setEndDim(1, 0);
+               for(FederatedRange range : fedMap.getFedMapping().keySet()) {
+                       long rs = range.getBeginDims()[0], re = 
range.getEndDims()[0],
+                               cs = range.getBeginDims()[1], ce = 
range.getEndDims()[1];
+                       long rsn = (ixrange.rowStart >= rs) ? (ixrange.rowStart 
- rs) : 0;
+                       long ren = (ixrange.rowEnd >= rs && ixrange.rowEnd < 
re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
+                       long csn = (ixrange.colStart >= cs) ? (ixrange.colStart 
- cs) : 0;
+                       long cen = (ixrange.colEnd >= cs && ixrange.colEnd < 
ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("Ranges for fed location: " + rsn + " 
" + ren + " " + csn + " " + cen);
+                               LOG.debug("ixRange                : " + 
ixrange);
+                               LOG.debug("Fed Mapping            : " + range);
                        }
-
+                       range.setBeginDim(0, Math.max(rs - ixrange.rowStart, 
0));
+                       range.setBeginDim(1, Math.max(cs - ixrange.colStart, 
0));
+                       range.setEndDim(0, (ixrange.rowEnd >= re ? 
re-ixrange.rowStart : ixrange.rowEnd-ixrange.rowStart + 1));
+                       range.setEndDim(1, (ixrange.colEnd >= ce ? 
ce-ixrange.colStart : ixrange.colEnd-ixrange.colStart + 1));
+                       if(LOG.isDebugEnabled())
+                               LOG.debug("Fed Mapping After      : " + range);
+                       ixs.put(range, new IndexRange(rsn, ren, csn, cen));
                }
 
+               // execute slicing of valid range 
                long varID = FederationUtils.getNextFedDataID();
-               FederationMap slicedMapping = fedMapping.mapParallel(varID, 
(range, data) -> {
+               FederationMap slicedFedMap = fedMap.mapParallel(varID, (range, 
data) -> {
                        try {
                                FederatedResponse response = 
data.executeFederatedOperation(new FederatedRequest(
                                        FederatedRequest.RequestType.EXEC_UDF, 
-1,
-                                       new SliceMatrix(data.getVarID(), varID, 
ixs.getOrDefault(range, new IndexRange(-1, -1, -1, -1)))))
-                                       .get();
+                                       new SliceMatrix(data.getVarID(), varID, 
ixs.get(range)))).get();
                                if(!response.isSuccessful())
                                        response.throwExceptionFromResponse();
+                               return null;
                        }
                        catch(Exception e) {
                                throw new DMLRuntimeException(e);
                        }
-                       return null;
                });
 
+               //update output mapping and data characteristics
                MatrixObject sliced = ec.getMatrixObject(output);
                sliced.getDataCharacteristics()
-                       .set(fedMapping.getMaxIndexInRange(0), 
fedMapping.getMaxIndexInRange(1), (int) in.getBlocksize());
-               if(ixrange.rowEnd - ixrange.rowStart == 0) {
-                       slicedMapping.setType(FederationMap.FType.COL);
-               }
-               else if(ixrange.colEnd - ixrange.colStart == 0) {
-                       slicedMapping.setType(FederationMap.FType.ROW);
-               }
-               sliced.setFedMapping(slicedMapping);
-               LOG.debug(slicedMapping);
-               LOG.debug(sliced);
+                       .set(slicedFedMap.getMaxIndexInRange(0), 
slicedFedMap.getMaxIndexInRange(1), (int) in.getBlocksize());
+               sliced.setFedMapping(slicedFedMap);
+               
+               //TODO is this really necessary
+               if(ixrange.rowEnd - ixrange.rowStart == 0)
+                       slicedFedMap.setType(FederationMap.FType.COL);
+               else if(ixrange.colEnd - ixrange.colStart == 0)
+                       slicedFedMap.setType(FederationMap.FType.ROW);
        }
 
        private static class SliceMatrix extends FederatedUDF {
@@ -141,11 +126,7 @@ public final class MatrixIndexingFEDInstruction extends 
IndexingFEDInstruction {
                @Override
                public FederatedResponse execute(ExecutionContext ec, Data... 
data) {
                        MatrixBlock mb = ((MatrixObject) 
data[0]).acquireReadAndRelease();
-                       MatrixBlock res;
-                       if(_ixrange.rowStart != -1)
-                               res = mb.slice(_ixrange, new MatrixBlock());
-                       else
-                               res = new MatrixBlock();
+                       MatrixBlock res = mb.slice(_ixrange, new MatrixBlock());
                        MatrixObject mout = 
ExecutionContext.createMatrixObject(res);
                        ec.setVariable(String.valueOf(_outputID), mout);
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
index 05c1901..d02d0f5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
@@ -128,10 +128,18 @@ public class 
MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFE
                                System.arraycopy(subRangeColNames, 0, colNames, 
(int) range.getBeginDims()[1], subRangeColNames.length);
                        }
                        catch(Exception e) {
-                               throw new DMLRuntimeException("Federated 
encoder creation failed: " + e.getMessage());
+                               throw new DMLRuntimeException("Federated 
encoder creation failed: ", e);
                        }
                        return null;
                });
+               
+               //sort for consistent encoding in local and federated
+               if( EncoderRecode.SORT_RECODE_MAP ) {
+                       for(Encoder encoder : globalEncoder.getEncoders())
+                               if( encoder instanceof EncoderRecode )
+                                       
((EncoderRecode)encoder).sortCPRecodeMaps();
+               }
+               
                FrameBlock meta = new FrameBlock((int) fin.getNumColumns(), 
Types.ValueType.STRING);
                meta.setColumnNames(colNames);
                globalEncoder.getMetaData(meta);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
index 134a2e3..b45654d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
@@ -126,14 +126,12 @@ public class VariableFEDInstruction extends 
FEDInstruction implements LineageTra
 
                // execute function at federated site.
                FederatedRequest fr1 = 
FederationUtils.callInstruction(_in.getInstructionString(),
-                       _in.getOutput(),
-                       new CPOperand[] {_in.getInput1()},
-                       new long[] {mo1.getFedMapping().getID()});
+                       _in.getOutput(), new CPOperand[] {_in.getInput1()}, new 
long[] {mo1.getFedMapping().getID()});
                mo1.getFedMapping().execute(getTID(), true, fr1);
 
                // Construct output local.
                FrameObject out = ec.getFrameObject(_in.getOutput());
-               out.getDataCharacteristics().set(mo1.getNumColumns(), 
mo1.getNumRows(), (int) mo1.getBlocksize(), mo1.getNnz());
+               out.getDataCharacteristics().set(mo1.getNumRows(), 
mo1.getNumColumns(), (int) mo1.getBlocksize(), mo1.getNnz());
                FederationMap outMap = 
mo1.getFedMapping().copyWithNewID(fr1.getID());
                Map<FederatedRange, FederatedData> newMap = new HashMap<>();
                for(Map.Entry<FederatedRange, FederatedData> pair : 
outMap.getFedMapping().entrySet()) {
@@ -152,5 +150,4 @@ public class VariableFEDInstruction extends FEDInstruction 
implements LineageTra
        public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
                return _in.getLineageItem(ec);
        }
-
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
index f4f98dc..82a95a8 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
@@ -49,6 +49,7 @@ import 
org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataUtils;
 
 public abstract class BinarySPInstruction extends ComputationSPInstruction {
 
@@ -344,22 +345,13 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                return mcOut;
        }
 
-       protected void 
updateBinaryAppendOutputDataCharacteristics(SparkExecutionContext sec, boolean 
cbind)
-       {
+       protected void 
updateBinaryAppendOutputDataCharacteristics(SparkExecutionContext sec, boolean 
cbind) {
                DataCharacteristics mc1 = 
sec.getDataCharacteristics(input1.getName());
                DataCharacteristics mc2 = 
sec.getDataCharacteristics(input2.getName());
                DataCharacteristics mcOut = 
sec.getDataCharacteristics(output.getName());
                
                //infer initially unknown dimensions from inputs
-               if(!mcOut.dimsKnown()) { 
-                       if( !mc1.dimsKnown() || !mc2.dimsKnown() )
-                               throw new DMLRuntimeException("The output 
dimensions are not specified and cannot be inferred from inputs.");
-                       
-                       if( cbind )
-                               mcOut.set(mc1.getRows(), 
mc1.getCols()+mc2.getCols(), mc1.getBlocksize(), mc1.getBlocksize());
-                       else //rbind
-                               mcOut.set(mc1.getRows()+mc2.getRows(), 
mc1.getCols(), mc1.getBlocksize(), mc1.getBlocksize());
-               }       
+               MetaDataUtils.updateAppendDataCharacteristics(mc1, mc2, mcOut, 
cbind);
                
                //infer initially unknown nnz from inputs
                if( !mcOut.nnzKnown() && mc1.nnzKnown() && mc2.nnzKnown() ) {
diff --git a/src/main/java/org/apache/sysds/runtime/meta/MetaDataUtils.java 
b/src/main/java/org/apache/sysds/runtime/meta/MetaDataUtils.java
new file mode 100644
index 0000000..f372130
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/meta/MetaDataUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.meta;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+
+public class MetaDataUtils {
+
+       public static void updateAppendDataCharacteristics(DataCharacteristics 
mc1,
+               DataCharacteristics mc2, DataCharacteristics mcOut, boolean 
cbind)
+       {
+               if(!mcOut.dimsKnown()) { 
+                       if( !mc1.dimsKnown() || !mc2.dimsKnown() )
+                               throw new DMLRuntimeException("The output 
dimensions are not specified and cannot be inferred from inputs.");
+                       
+                       if( cbind )
+                               mcOut.set(mc1.getRows(), 
mc1.getCols()+mc2.getCols(), mc1.getBlocksize());
+                       else //rbind
+                               mcOut.set(mc1.getRows()+mc2.getRows(), 
mc1.getCols(), mc1.getBlocksize());
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
index 6a1ea0b..1dc7bf2 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.transform.encode;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -40,6 +41,9 @@ public class EncoderRecode extends Encoder
 {
        private static final long serialVersionUID = 8213163881283341874L;
        
+       //test property to ensure consistent encoding for local and federated
+       public static boolean SORT_RECODE_MAP = false;
+       
        //recode maps and custom map for partial recode maps 
        private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new 
HashMap<>();
        private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null;
@@ -72,6 +76,16 @@ public class EncoderRecode extends Encoder
                return _rcdMapsPart; 
        }
        
+       public void sortCPRecodeMaps() {
+               for( HashMap<String,Long> map : _rcdMaps.values() ) {
+                       String[] keys= map.keySet().toArray(new String[0]);
+                       Arrays.sort(keys);
+                       map.clear();
+                       for(String key : keys)
+                               putCode(map, key);
+               }
+       }
+       
        private long lookupRCDMap(int colID, String key) {
                if( !_rcdMaps.containsKey(colID) )
                        return -1; //empty recode map
@@ -111,6 +125,10 @@ public class EncoderRecode extends Encoder
                                        putCode(map, key);
                        }
                }
+               
+               if( SORT_RECODE_MAP ) {
+                       sortCPRecodeMaps();
+               }
        }
 
        /**
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 3c3471e..1e62975 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -106,6 +106,8 @@ public abstract class AutomatedTestBase {
        public static final double GPU_TOLERANCE = 1e-9;
 
        public static final int FED_WORKER_WAIT = 1000; // in ms
+       public static final int FED_WORKER_WAIT_S = 30; // in ms
+       
 
        // With OpenJDK 8u242 on Windows, the new changes in JDK are not 
allowing
        // to set the native library paths internally thus breaking the code.
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index ff811e0..e15c7d1 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -114,9 +114,9 @@ public class FederatedBivarTest extends AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
index 82437b1..20b0147 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedCorTest.java
@@ -102,9 +102,9 @@ public class FederatedCorTest extends AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index eb8aee8..37a7787 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -95,7 +95,7 @@ public class FederatedGLMTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index f296b3a..e352b5a 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -104,7 +104,7 @@ public class FederatedKmeansTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index f17754e..95e5ba4 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -99,7 +99,7 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
new file mode 100644
index 0000000..a5a3d02
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.algorithms;
+
+import org.junit.Test;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.matrix.data.LibMatrixMult;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+
[email protected]
+public class FederatedLmPipeline extends AutomatedTestBase {
+
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME = "FederatedLmPipeline";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedLmPipeline.class.getSimpleName() + "/";
+
+       public int rows = 10000;
+       public int cols = 1000;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+       }
+
+       @Test
+       public void federatedLmPipelineContinguous() {
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true);
+       }
+       
+       @Test
+       public void federatedLmPipelineSampled() {
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false);
+       }
+
+       public void federatedLmPipeline(ExecMode execMode, boolean contSplits) {
+               ExecMode oldExec = setExecMode(execMode);
+               boolean oldSort = EncoderRecode.SORT_RECODE_MAP;
+               EncoderRecode.SORT_RECODE_MAP = true;
+               
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               try {
+                       // generated lm data
+                       MatrixBlock X = MatrixBlock.randOperations(rows, cols, 
1.0, 0, 1, "uniform", 7);
+                       MatrixBlock w = MatrixBlock.randOperations(cols, 1, 
1.0, 0, 1, "uniform", 3);
+                       MatrixBlock y = new MatrixBlock(rows, 1, 
false).allocateBlock();
+                       LibMatrixMult.matrixMult(X, w, y);
+                       MatrixBlock c = MatrixBlock.randOperations(rows, 1, 
1.0, 1, 50, "uniform", 23);
+                       MatrixBlock rc = 
c.unaryOperations(InstructionUtils.parseUnaryOperator("round"), new 
MatrixBlock());
+                       X = rc.append(X, new MatrixBlock(), true);
+                       
+                       // We have two matrices handled by a single federated 
worker
+                       int halfRows = rows / 2;
+                       writeInputMatrixWithMTD("X1", X.slice(0, halfRows-1), 
false);
+                       writeInputMatrixWithMTD("X2", X.slice(halfRows, 
rows-1), false);
+                       writeInputMatrixWithMTD("Y", y, false);
+                       
+                       // empty script name because we don't execute any 
script, just start the worker
+                       fullDMLScriptName = "";
+                       int port1 = getRandomAvailablePort();
+                       int port2 = getRandomAvailablePort();
+                       Thread t1 = startLocalFedWorkerThread(port1, 
FED_WORKER_WAIT_S);
+                       Thread t2 = startLocalFedWorkerThread(port2);
+       
+                       TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+                       loadTestConfiguration(config);
+       
+                       // Run reference dml script with normal matrix
+                       fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+                       programArgs = new String[] {"-args", input("X1"), 
input("X2"), input("Y"),
+                               String.valueOf(contSplits).toUpperCase(), 
expected("Z")};
+                       runTest(true, false, null, -1);
+       
+                       // Run actual dml script with federated matrix
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-nvargs", "in_X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
+                               "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")), "rows=" + rows, "cols=" + (cols+1),
+                               "in_Y=" + input("Y"), "cont=" + 
String.valueOf(contSplits).toUpperCase(), "out=" + output("Z")};
+                       runTest(true, false, null, -1);
+       
+                       // compare via files
+                       compareResults(1e-2);
+                       TestUtils.shutdownThreads(t1, t2);
+               }
+               finally {
+                       resetExecMode(oldExec);
+                       EncoderRecode.SORT_RECODE_MAP = oldSort;
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index e7f1f80..fe67bc2 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -95,7 +95,7 @@ public class FederatedLogRegTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 8438bb6..ae2f2fa 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -102,9 +102,9 @@ public class FederatedPCATest extends AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index 7333533..a4a8236 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -100,9 +100,9 @@ public class FederatedUnivarTest extends AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
index 46af1c9..348f157 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedVarTest.java
@@ -109,9 +109,9 @@ public class FederatedVarTest extends AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
index d0eaf87..1d58574 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
@@ -104,7 +104,7 @@ public class FederatedYL2SVMTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index a5630e0..810b882 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -87,7 +87,7 @@ public class FederatedReaderTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
                String host = "localhost";
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
index 261daf6..fc2c1dd 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
@@ -93,7 +93,7 @@ public class FederatedSSLTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
                String host = "localhost";
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index a83fad3..d8bb743 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -83,7 +83,7 @@ public class FederatedWriterTest extends AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                try {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
index 3015aaa..cc0af07 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/paramserv/FederatedParamservTest.java
@@ -155,7 +155,7 @@ public class FederatedParamservTest extends 
AutomatedTestBase {
 
                                // start worker
                                ports.add(getRandomAvailablePort());
-                               
threads.add(startLocalFedWorkerThread(ports.get(i), 10));
+                               
threads.add(startLocalFedWorkerThread(ports.get(i), FED_WORKER_WAIT_S));
 
                                // add worker to program args
                                programArgsList.add("X" + i + "=" + 
TestUtils.federatedAddress(ports.get(i), input("X" + i)));
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index 958c09b..279f524 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -95,7 +95,7 @@ public class FederatedBinaryMatrixTest extends 
AutomatedTestBase {
 
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index e8dd6f7..d3cea77 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -96,7 +96,7 @@ public class FederatedBinaryVectorTest extends 
AutomatedTestBase {
 
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
index fe03906..3b05391 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToFrameTest.java
@@ -97,12 +97,13 @@ public class FederatedCastToFrameTest extends 
AutomatedTestBase {
 
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
                loadTestConfiguration(config);
-
+               setOutputBuffering(true); //otherwise NPE
+               
                // Run reference dml script with normal matrix
                fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
                programArgs = new String[] {"-nvargs", "X1=" + input("X1"), 
"X2=" + input("X2")};
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
index fa51d89..4fb95c6 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCastToMatrixTest.java
@@ -126,12 +126,13 @@ public class FederatedCastToMatrixTest extends 
AutomatedTestBase {
 
                        int port1 = getRandomAvailablePort();
                        int port2 = getRandomAvailablePort();
-                       Thread t1 = startLocalFedWorkerThread(port1, 10);
+                       Thread t1 = startLocalFedWorkerThread(port1, 
FED_WORKER_WAIT_S);
                        Thread t2 = startLocalFedWorkerThread(port2);
 
                        TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
                        loadTestConfiguration(config);
-
+                       setOutputBuffering(true); //otherwise NPE
+                       
                        // Run reference dml script with normal matrix
                        fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
                        programArgs = new String[] {"-nvargs", "X1=" + 
input("X1"), "X2=" + input("X2")};
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
index 828718e..98b72a9 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCentralMomentTest.java
@@ -39,109 +39,109 @@ import org.junit.runners.Parameterized;
 @net.jcip.annotations.NotThreadSafe
 public class FederatedCentralMomentTest extends AutomatedTestBase {
 
-    private final static String TEST_DIR = "functions/federated/";
-    private final static String TEST_NAME = "FederatedCentralMomentTest";
-    private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedCentralMomentTest.class.getSimpleName() + "/";
-
-    private final static int blocksize = 1024;
-    @Parameterized.Parameter()
-    public int rows;
-
-    @Parameterized.Parameter(1)
-    public int k;
-
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-                {1000, 2},
-                {1000, 3}, 
-                {1000, 4}
-        });
-    }
-
-    @Override
-    public void setUp() {
-        TestUtils.clearAssertionInformation();
-        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, 
TEST_NAME, new String[] {"S.scalar"}));
-    }
-
-    @Test
-    public void federatedCentralMomentCP() { 
federatedCentralMoment(Types.ExecMode.SINGLE_NODE); }
-
-    @Test
-    @Ignore
-    public void federatedCentralMomentSP() { 
federatedCentralMoment(Types.ExecMode.SPARK); }
-
-    public void federatedCentralMoment(Types.ExecMode execMode) {
-        boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-        Types.ExecMode platformOld = rtplatform;
-
-        getAndLoadTestConfiguration(TEST_NAME);
-        String HOME = SCRIPT_DIR + TEST_DIR;
-
-        int r = rows / 4;
-
-        double[][] X1 = getRandomMatrix(r, 1, 1, 5, 1, 3);
-        double[][] X2 = getRandomMatrix(r, 1, 1, 5, 1, 7);
-        double[][] X3 = getRandomMatrix(r, 1, 1, 5, 1, 8);
-        double[][] X4 = getRandomMatrix(r, 1, 1, 5, 1, 9);
-
-        MatrixCharacteristics mc = new MatrixCharacteristics(r, 1, blocksize, 
r);
-        writeInputMatrixWithMTD("X1", X1, false, mc);
-        writeInputMatrixWithMTD("X2", X2, false, mc);
-        writeInputMatrixWithMTD("X3", X3, false, mc);
-        writeInputMatrixWithMTD("X4", X4, false, mc);
-
-        // empty script name because we don't execute any script, just start 
the worker
-        fullDMLScriptName = "";
-        int port1 = getRandomAvailablePort();
-        int port2 = getRandomAvailablePort();
-        int port3 = getRandomAvailablePort();
-        int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME = "FederatedCentralMomentTest";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedCentralMomentTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+
+       @Parameterized.Parameter(1)
+       public int k;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                               {1000, 2},
+                               {1000, 3}, 
+                               {1000, 4}
+               });
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S.scalar"}));
+       }
+
+       @Test
+       public void federatedCentralMomentCP() { 
federatedCentralMoment(Types.ExecMode.SINGLE_NODE); }
+
+       @Test
+       @Ignore
+       public void federatedCentralMomentSP() { 
federatedCentralMoment(Types.ExecMode.SPARK); }
+
+       public void federatedCentralMoment(Types.ExecMode execMode) {
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               Types.ExecMode platformOld = rtplatform;
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               int r = rows / 4;
+
+               double[][] X1 = getRandomMatrix(r, 1, 1, 5, 1, 3);
+               double[][] X2 = getRandomMatrix(r, 1, 1, 5, 1, 7);
+               double[][] X3 = getRandomMatrix(r, 1, 1, 5, 1, 8);
+               double[][] X4 = getRandomMatrix(r, 1, 1, 5, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r, 1, 
blocksize, r);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
-        // reference file should not be written to hdfs, so we set platform 
here
-        rtplatform = execMode;
-        if(rtplatform == Types.ExecMode.SPARK) {
-            DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-        }
-        // Run reference dml script with normal matrix for Row/Col
-        fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-        programArgs = new String[] {"-stats", "100", "-args", 
-            input("X1"), input("X2"), input("X3"), input("X4"), expected("S"), 
String.valueOf(k)};
-        runTest(null);
-
-        TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
-        loadTestConfiguration(config);
-
-        fullDMLScriptName = HOME + TEST_NAME + ".dml";
-        programArgs = new String[] {"-stats", "100", "-nvargs",
-            "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
-            "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
-            "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-            "in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
-            "rows=" + rows,
-            "cols=" + 1,
-            "out_S=" + output("S"),
-            "k=" + k};
-        runTest(null);
-
-        // compare all sums via files
-        compareResults(0.01);
-
-        Assert.assertTrue(heavyHittersContainsString("fed_cm"));
-
-        // check that federated input files are still existing
-        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
-        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
-        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
-        Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
-
-        TestUtils.shutdownThreads(t1, t2, t3, t4);
-        rtplatform = platformOld;
-        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-    }
+               // reference file should not be written to hdfs, so we set 
platform here
+               rtplatform = execMode;
+               if(rtplatform == Types.ExecMode.SPARK) {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+               // Run reference dml script with normal matrix for Row/Col
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-stats", "100", "-args", 
+                       input("X1"), input("X2"), input("X3"), input("X4"), 
expected("S"), String.valueOf(k)};
+               runTest(null);
+
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")),
+                       "rows=" + rows,
+                       "cols=" + 1,
+                       "out_S=" + output("S"),
+                       "k=" + k};
+               runTest(null);
+
+               // compare all sums via files
+               compareResults(0.01);
+
+               Assert.assertTrue(heavyHittersContainsString("fed_cm"));
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+               rtplatform = platformOld;
+               DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
index a8480e9..1bdcb5b 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedColAggregateTest.java
@@ -157,9 +157,9 @@ public class FederatedColAggregateTest extends 
AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
index d388913..9213620 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -198,9 +198,9 @@ public class FederatedFullAggregateTest extends 
AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index 3bc2649..8836203 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -103,7 +103,7 @@ public class FederatedMultiplyTest extends 
AutomatedTestBase {
 
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index 540b188..efde5b7 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -93,8 +93,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
                writeInputMatrixWithMTD("B", B, false, new 
MatrixCharacteristics(rows, cols, blocksize, rows * cols));
 
                int port1 = getRandomAvailablePort();
-               int port2 = getRandomAvailablePort();           
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               int port2 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                // we need the reference file to not be written to hdfs, so we 
get the correct format
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRemoveEmptyTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRemoveEmptyTest.java
index de1e6d5..a629270 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRemoveEmptyTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRemoveEmptyTest.java
@@ -113,9 +113,9 @@ public class FederatedRemoveEmptyTest extends 
AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
index b9e7f62..1401792 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
@@ -149,9 +149,9 @@ public class FederatedRightIndexTest extends 
AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
index 49e692e..e0a3632 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowAggregateTest.java
@@ -156,9 +156,9 @@ public class FederatedRowAggregateTest extends 
AutomatedTestBase {
                int port2 = getRandomAvailablePort();
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
-               Thread t2 = startLocalFedWorkerThread(port2, 10);
-               Thread t3 = startLocalFedWorkerThread(port3, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                rtplatform = execMode;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
index 9d37aff..3e640c0 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSplitTest.java
@@ -39,100 +39,99 @@ import org.junit.runners.Parameterized;
 @net.jcip.annotations.NotThreadSafe
 public class FederatedSplitTest extends AutomatedTestBase {
 
-    private static final Log LOG = 
LogFactory.getLog(FederatedSplitTest.class.getName());
-    private final static String TEST_DIR = "functions/federated/";
-    private final static String TEST_NAME = "FederatedSplitTest";
-    private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedSplitTest.class.getSimpleName() + "/";
-
-    private final static int blocksize = 1024;
-    @Parameterized.Parameter()
-    public int rows;
-    @Parameterized.Parameter(1)
-    public int cols;
-    @Parameterized.Parameter(2)
-    public String cont;
-
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {{152, 12, "TRUE"}, {132, 11, 
"FALSE"}});
-    }
-
-    @Override
-    public void setUp() {
-        TestUtils.clearAssertionInformation();
-        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, 
TEST_NAME, new String[] {"Z"}));
-    }
-
-    @Test
-    public void federatedSplitCP() {
-        federatedSplit(Types.ExecMode.SINGLE_NODE);
-    }
-
-    public void federatedSplit(Types.ExecMode execMode) {
-        boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-        Types.ExecMode platformOld = rtplatform;
-        rtplatform = execMode;
-        if(rtplatform == Types.ExecMode.SPARK) {
-            DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-        }
-
-        getAndLoadTestConfiguration(TEST_NAME);
-        String HOME = SCRIPT_DIR + TEST_DIR;
-
-        // write input matrices
-        int halfRows = rows / 2;
-        // We have two matrices handled by a single federated worker
-        double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
-        double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
-        // And another two matrices handled by a single federated worker
-        double[][] Y1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 44);
-        double[][] Y2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 21);
-
-        writeInputMatrixWithMTD("X1", X1, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
-        writeInputMatrixWithMTD("X2", X2, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
-        writeInputMatrixWithMTD("Y1", Y1, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
-        writeInputMatrixWithMTD("Y2", Y2, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
-
-        TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
-        loadTestConfiguration(config);
-
-        int port1 = getRandomAvailablePort();
-        int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+       private static final Log LOG = 
LogFactory.getLog(FederatedSplitTest.class.getName());
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME = "FederatedSplitTest";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedSplitTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+       @Parameterized.Parameter(2)
+       public String cont;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {{152, 12, "TRUE"}, {132, 
11, "FALSE"}});
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+       }
+
+       @Test
+       public void federatedSplitCP() {
+               federatedSplit(Types.ExecMode.SINGLE_NODE);
+       }
+
+       public void federatedSplit(Types.ExecMode execMode) {
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               Types.ExecMode platformOld = rtplatform;
+               rtplatform = execMode;
+               if(rtplatform == Types.ExecMode.SPARK) {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int halfRows = rows / 2;
+               // We have two matrices handled by a single federated worker
+               double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+               double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+               // And another two matrices handled by a single federated worker
+               double[][] Y1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 44);
+               double[][] Y2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 21);
+
+               writeInputMatrixWithMTD("X1", X1, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+               writeInputMatrixWithMTD("X2", X2, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+               writeInputMatrixWithMTD("Y1", Y1, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+               writeInputMatrixWithMTD("Y2", Y2, false, new 
MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+               setOutputBuffering(true); //otherwise NPE
+               
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
-        // Run reference dml script with normal matrix
-        fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-        programArgs = new String[] {"-nvargs", "X1=" + input("X1"), "X2=" + 
input("X2"), "Y1=" + input("Y1"),
-            "Y2=" + input("Y2"), "Z=" + expected("Z"), "Cont=" + cont};
-        String out = runTest(null).toString();
-
-        // Run actual dml script with federated matrix
-        fullDMLScriptName = HOME + TEST_NAME + ".dml";
-        programArgs = new String[] {"-stats", "100", "-nvargs", "X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
-            "X2=" + TestUtils.federatedAddress(port2, input("X2")),
-            "Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
-            "Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + 
rows, "c=" + cols, "Z=" + output("Z"),
-            "Cont=" + cont};
-        String fedOut = runTest(null).toString();
-
-        LOG.debug(out);
-        LOG.debug(fedOut);
-        // compare via files
-        compareResults(1e-9);
-
-        if(cont.equals("TRUE"))
-            Assert.assertTrue(heavyHittersContainsString("fed_rightIndex"));
-        else{
-
-            Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
-            // TODO add federated diag operator.
-            // Assert.assertTrue(heavyHittersContainsString("fed_rdiag"));
-
-        }
-        
-        TestUtils.shutdownThreads(t1, t2);
-        rtplatform = platformOld;
-        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-    }
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-nvargs", "X1=" + input("X1"), 
"X2=" + input("X2"), "Y1=" + input("Y1"),
+                       "Y2=" + input("Y2"), "Z=" + expected("Z"), "Cont=" + 
cont};
+               String out = runTest(null).toString();
+
+               // Run actual dml script with federated matrix
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-nvargs", "X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
+                       "X2=" + TestUtils.federatedAddress(port2, input("X2")),
+                       "Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
+                       "Y2=" + TestUtils.federatedAddress(port2, input("Y2")), 
"r=" + rows, "c=" + cols, "Z=" + output("Z"),
+                       "Cont=" + cont};
+               String fedOut = runTest(null).toString();
+
+               LOG.debug(out);
+               LOG.debug(fedOut);
+               // compare via files
+               compareResults(1e-9);
+
+               if(cont.equals("TRUE"))
+                       
Assert.assertTrue(heavyHittersContainsString("fed_rightIndex"));
+               else {
+                       
Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+                       // TODO add federated diag operator.
+                       // 
Assert.assertTrue(heavyHittersContainsString("fed_rdiag"));
+               }
+               
+               TestUtils.shutdownThreads(t1, t2);
+               rtplatform = platformOld;
+               DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
index 865582d..09ca19e 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
@@ -99,7 +99,7 @@ public class FederatedStatisticsTest extends 
AutomatedTestBase {
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
                int port2 = getRandomAvailablePort();
-               Thread t1 = startLocalFedWorkerThread(port1, 10);
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
                Thread t2 = startLocalFedWorkerThread(port2);
 
                TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
index b7036d0..6c9b034 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
@@ -199,9 +199,9 @@ public class TransformFederatedEncodeApplyTest extends 
AutomatedTestBase {
                        int port2 = getRandomAvailablePort();
                        int port3 = getRandomAvailablePort();
                        int port4 = getRandomAvailablePort();
-                       t1 = startLocalFedWorkerThread(port1, 10);
-                       t2 = startLocalFedWorkerThread(port2, 10);
-                       t3 = startLocalFedWorkerThread(port3, 10);
+                       t1 = startLocalFedWorkerThread(port1, 
FED_WORKER_WAIT_S);
+                       t2 = startLocalFedWorkerThread(port2, 
FED_WORKER_WAIT_S);
+                       t3 = startLocalFedWorkerThread(port3, 
FED_WORKER_WAIT_S);
                        t4 = startLocalFedWorkerThread(port4);
 
                        FileFormatPropertiesCSV ffpCSV = new 
FileFormatPropertiesCSV(true, DataExpression.DEFAULT_DELIM_DELIMITER,
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
index 458dbc1..71be21b 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
@@ -134,9 +134,9 @@ public class TransformFederatedEncodeDecodeTest extends 
AutomatedTestBase {
                        int port2 = getRandomAvailablePort();
                        int port3 = getRandomAvailablePort();
                        int port4 = getRandomAvailablePort();
-                       t1 = startLocalFedWorkerThread(port1, 10);
-                       t2 = startLocalFedWorkerThread(port2, 10);
-                       t3 = startLocalFedWorkerThread(port3, 10);
+                       t1 = startLocalFedWorkerThread(port1, 
FED_WORKER_WAIT_S);
+                       t2 = startLocalFedWorkerThread(port2, 
FED_WORKER_WAIT_S);
+                       t3 = startLocalFedWorkerThread(port3, 
FED_WORKER_WAIT_S);
                        t4 = startLocalFedWorkerThread(port4);
 
                        // schema
diff --git a/src/test/scripts/functions/federated/FederatedLmPipeline.dml 
b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
new file mode 100644
index 0000000..323333d
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
@@ -0,0 +1,65 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+Fin = federated(addresses=list($in_X1, $in_X2),
+    ranges=list(list(0, 0), list($rows / 2, $cols), list($rows / 2, 0), 
list($rows, $cols)))
+y = read($in_Y)
+
+# one hot encoding categorical, other passthrough
+Fall = as.frame(Fin)
+jspec = "{ ids:true, dummycode:[1] }"
+[X,M] = transformencode(target=Fall, spec=jspec)
+print("ncol(X) = "+ncol(X))
+
+# clipping out of value ranges
+colSD = colSds(X)
+colMean = (colMeans(X))
+upperBound = colMean + 1.5 * colSD
+lowerBound = colMean - 1.5 * colSD
+outFilter = (X < lowerBound) | (X > upperBound)
+X = X - outFilter*X + outFilter*colMeans(X); 
+
+# normalization
+X = scale(X=X, center=TRUE, scale=TRUE);
+
+# split training and testing
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$cont, seed=7)
+
+# train regression model 
+B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
+
+# model evaluation on test split
+yhat = lmpredict(X=Xtest, w=B, icpt=1);
+y_residual = ytest - yhat;
+
+avg_res = sum(y_residual) / nrow(ytest);
+ss_res = sum(y_residual^2);
+ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
+R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
+print("\nAccuracy:" +
+      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(yhat) = " + sum(yhat) +
+      "\n--AVG_RES_Y: " + avg_res + 
+      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
+      "\n--R2: " + R2 );  
+ 
+# write trained model and meta data
+write(B, $out)
diff --git 
a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml 
b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
new file mode 100644
index 0000000..72ca292
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
@@ -0,0 +1,64 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+Fin = rbind(read($1), read($2))
+y = read($3)
+
+# one hot encoding categorical, other passthrough
+Fall = as.frame(Fin)
+jspec = "{ ids:true, dummycode:[1] }"
+[X,M] = transformencode(target=Fall, spec=jspec)
+print("ncol(X) = "+ncol(X))
+
+# clipping out of value ranges
+colSD = colSds(X)
+colMean = (colMeans(X))
+upperBound = colMean + 1.5 * colSD
+lowerBound = colMean - 1.5 * colSD
+outFilter = (X < lowerBound) | (X > upperBound)
+X = X - outFilter*X + outFilter*colMeans(X); 
+
+# normalization
+X = scale(X=X, center=TRUE, scale=TRUE);
+
+# split training and testing
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$4, seed=7)
+
+# train regression model 
+B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
+
+# model evaluation on test split
+yhat = lmpredict(X=Xtest, w=B, icpt=1);
+y_residual = ytest - yhat;
+
+avg_res = sum(y_residual) / nrow(ytest);
+ss_res = sum(y_residual^2);
+ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
+R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
+print("\nAccuracy:" +
+      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(yhat) = " + sum(yhat) +
+      "\n--AVG_RES_Y: " + avg_res + 
+      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
+      "\n--R2: " + R2 );  
+ 
+# write trained model and meta data
+write(B, $5)

Reply via email to