This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d1f081e80 [SYSTEMDS-3695] Fix missing frame nary-append spark 
instruction
7d1f081e80 is described below

commit 7d1f081e8089f7700d049976b48d61fa530e1deb
Author: e-strauss <[email protected]>
AuthorDate: Mon Jun 3 09:33:37 2024 +0200

    [SYSTEMDS-3695] Fix missing frame nary-append spark instruction
    
    Closes #2026.
---
 .../spark/BuiltinNarySPInstruction.java            | 135 ++++++++++++++++++---
 .../spark/FrameAppendMSPInstruction.java           |  21 +++-
 .../spark/FrameAppendRSPInstruction.java           |  31 ++---
 .../functions/builtin/part2/BuiltinWerTest.java    |   8 +-
 .../test/functions/frame/FrameAppendDistTest.java  |  64 +++++++---
 src/test/scripts/functions/frame/FrameNAryAppend.R |  33 +++++
 .../scripts/functions/frame/FrameNAryAppend.dml    |  30 +++++
 .../functions/frame/FrameNAryAppendMisalign.R      |  30 +++++
 .../functions/frame/FrameNAryAppendMisalign.dml    |  27 +++++
 9 files changed, 319 insertions(+), 60 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
index 313af16dbc..80d25f290d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -20,12 +20,19 @@
 package org.apache.sysds.runtime.instructions.spark;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.sysds.hops.BinaryOp;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -47,8 +54,16 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import scala.Tuple2;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.sysds.hops.BinaryOp.AppendMethod.MR_MAPPEND;
+import static org.apache.sysds.hops.BinaryOp.AppendMethod.MR_RAPPEND;
+import static org.apache.sysds.hops.OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE;
+import static 
org.apache.sysds.runtime.instructions.spark.FrameAppendMSPInstruction.appendFrameMSP;
+import static 
org.apache.sysds.runtime.instructions.spark.FrameAppendRSPInstruction.appendFrameRSP;
+
 public class BuiltinNarySPInstruction extends SPInstruction implements 
LineageTraceable
 {
        private CPOperand[] inputs;
@@ -75,32 +90,82 @@ public class BuiltinNarySPInstruction extends SPInstruction 
implements LineageTr
        public void processInstruction(ExecutionContext ec) {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-               DataCharacteristics mcOut = null;
+               DataCharacteristics dcout = null;
+               boolean inputIsMatrix = inputs[0].isMatrix();
+
                
                if( getOpcode().equals("cbind") || getOpcode().equals("rbind") 
) {
                        //compute output characteristics
                        boolean cbind = getOpcode().equals("cbind");
-                       mcOut = computeAppendOutputDataCharacteristics(sec, 
inputs, cbind);
-                       
-                       //get consolidated input via union over shifted and 
padded inputs
-                       DataCharacteristics off = new MatrixCharacteristics(0, 
0, mcOut.getBlocksize(), 0);
-                       for( CPOperand input : inputs ) {
-                               DataCharacteristics mcIn = 
sec.getDataCharacteristics(input.getName());
-                               JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec
-                                       
.getBinaryMatrixBlockRDDHandleForVariable( input.getName() )
-                                       .flatMapToPair(new ShiftMatrix(off, 
mcIn, cbind))
-                                       .mapToPair(new 
PadBlocksFunction(mcOut)); //just padding
-                               out = (out != null) ? out.union(in) : in;
-                               updateAppendDataCharacteristics(mcIn, off, 
cbind);
+                       dcout = computeAppendOutputDataCharacteristics(sec, 
inputs, cbind);
+                       if(inputIsMatrix){
+                               //get consolidated input via union over shifted 
and padded inputs
+                               DataCharacteristics off = new 
MatrixCharacteristics(0, 0, dcout.getBlocksize(), 0);
+                               for( CPOperand input : inputs ) {
+                                       DataCharacteristics mcIn = 
sec.getDataCharacteristics(input.getName());
+                                       JavaPairRDD<MatrixIndexes, MatrixBlock> 
in = sec
+                                                       
.getBinaryMatrixBlockRDDHandleForVariable(input.getName())
+                                                       .flatMapToPair(new 
ShiftMatrix(off, mcIn, cbind))
+                                                       .mapToPair(new 
PadBlocksFunction(dcout)); //just padding
+                                       out = (out != null) ? out.union(in) : 
in;
+                                       updateAppendDataCharacteristics(mcIn, 
off, cbind);
+                               }
+                               //aggregate partially overlapping blocks w/ 
single shuffle
+                               int numPartOut = 
SparkUtils.getNumPreferredPartitions(dcout);
+                               out = RDDAggregateUtils.mergeByKey(out, 
numPartOut, false);
+                       }
+                       //FRAME
+                       else {
+                               JavaPairRDD<Long,FrameBlock> outFrame = 
+                                       
sec.getFrameBinaryBlockRDDHandleForVariable( inputs[0].getName() );
+                               dcout = new 
MatrixCharacteristics(sec.getDataCharacteristics(inputs[0].getName()));
+                               FrameObject fo = new 
FrameObject(sec.getFrameObject(inputs[0].getName()));
+                               boolean[] broadcasted = new 
boolean[inputs.length];
+                               broadcasted[0] = false;
+
+                               for(int i = 1; i < inputs.length; i++){
+                                       DataCharacteristics dcIn = 
sec.getDataCharacteristics(inputs[i].getName());
+                                       final int blk_size = 
dcout.getBlocksize() <= 0 ? DEFAULT_FRAME_BLOCKSIZE : dcout.getBlocksize();
+
+                                       broadcasted[i] = 
BinaryOp.FORCED_APPEND_METHOD == MR_MAPPEND
+                                               || 
BinaryOp.FORCED_APPEND_METHOD == null && cbind && dcIn.getCols() <= blk_size 
+                                                       && 
OptimizerUtils.checkSparkBroadcastMemoryBudget(
+                                                               
dcout.getCols(), dcIn.getCols(), blk_size, dcIn.getNonZeros());
+
+                                       //easy case: broadcast & map
+                                       if(broadcasted[i]){
+                                               outFrame = 
appendFrameMSP(outFrame, sec.getBroadcastForFrameVariable(inputs[i].getName()));
+                                       }
+                                       //general case for frames:
+                                       else{
+                                               
if(BinaryOp.FORCED_APPEND_METHOD != null && BinaryOp.FORCED_APPEND_METHOD != 
MR_RAPPEND)
+                                                       throw new 
DMLRuntimeException("Forced append type ["
+                                                               
+BinaryOp.FORCED_APPEND_METHOD+"] is not supported for frames");
+
+                                               JavaPairRDD<Long,FrameBlock> 
in2 = 
+                                                       
sec.getFrameBinaryBlockRDDHandleForVariable(inputs[i].getName() );
+                                               outFrame = 
appendFrameRSP(outFrame, in2, dcout.getRows(), cbind);
+                                       }
+                                       updateAppendDataCharacteristics(dcIn, 
dcout, cbind);
+                                       if(cbind)
+                                               
fo.setSchema(fo.mergeSchemas(sec.getFrameObject(inputs[i].getName())));
+                               }
+
+                               //set output RDD and add lineage
+                               
sec.getDataCharacteristics(output.getName()).set(dcout);
+                               sec.setRDDHandleForVariable(output.getName(), 
outFrame);
+                               
sec.getFrameObject(output.getName()).setSchema(fo.getSchema());
+                               for( int i = 0; i < inputs.length; i++)
+                                       if(broadcasted[i])
+                                               
sec.addLineageBroadcast(output.getName(), inputs[i].getName());
+                                       else
+                                               
sec.addLineageRDD(output.getName(), inputs[i].getName());
+                               return;
                        }
-                       
-                       //aggregate partially overlapping blocks w/ single 
shuffle
-                       int numPartOut = 
SparkUtils.getNumPreferredPartitions(mcOut);
-                       out = RDDAggregateUtils.mergeByKey(out, numPartOut, 
false);
                }
                else if( ArrayUtils.contains(new String[]{"nmin","nmax","n+"}, 
getOpcode()) ) {
                        //compute output characteristics
-                       mcOut = computeMinMaxOutputDataCharacteristics(sec, 
inputs);
+                       dcout = computeMinMaxOutputDataCharacteristics(sec, 
inputs);
                        
                        //get scalars and consolidated input via join
                        List<ScalarObject> scalars = 
sec.getScalarInputs(inputs);
@@ -118,13 +183,43 @@ public class BuiltinNarySPInstruction extends 
SPInstruction implements LineageTr
                }
                
                //set output RDD and add lineage
-               sec.getDataCharacteristics(output.getName()).set(mcOut);
+               sec.getDataCharacteristics(output.getName()).set(dcout);
                sec.setRDDHandleForVariable(output.getName(), out);
                for( CPOperand input : inputs )
                        if( !input.isScalar() )
                                sec.addLineageRDD(output.getName(), 
input.getName());
        }
-       
+
+       @SuppressWarnings("unused")
+       private static class AlignBlkTask implements 
PairFlatMapFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> {
+               private static final long serialVersionUID = 
1333460067852261573L;
+               long max_rows;
+
+               public AlignBlkTask(long rows) {
+                       max_rows = rows;
+               }
+
+               @Override
+               public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> longFrameBlockTuple2) throws Exception {
+                       Long index = longFrameBlockTuple2._1;
+                       FrameBlock fb = longFrameBlockTuple2._2;
+                       ArrayList<Tuple2<Long, FrameBlock>> list = new 
ArrayList<Tuple2<Long, FrameBlock>>();
+                       //single output block
+                       if(max_rows <= DEFAULT_FRAME_BLOCKSIZE){
+                               FrameBlock fbout = new 
FrameBlock(fb.getSchema());
+                               fbout.ensureAllocatedColumns((int) max_rows);
+                               fbout = 
fbout.leftIndexingOperations(fb,index.intValue() - 1, index.intValue() + 
fb.getNumRows() - 2,0, fb.getNumColumns()-1, null );
+                               list.add(new Tuple2<>(1L, fbout));
+                       } else {
+                               throw new NotImplementedException("Other 
Alignment strategies need to be implemented");
+                               //long aligned_index = (index / 
DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+                               //list.add(new Tuple2<>(index / 
DEFAULT_FRAME_BLOCKSIZE + 1, fb));
+                       }
+
+                       return list.iterator();
+               }
+       }
+
        private static DataCharacteristics 
computeAppendOutputDataCharacteristics(SparkExecutionContext sec, CPOperand[] 
inputs, boolean cbind) {
                DataCharacteristics mcIn1 = 
sec.getDataCharacteristics(inputs[0].getName());
                DataCharacteristics mcOut = new MatrixCharacteristics(0, 0, 
mcIn1.getBlocksize(), 0);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
index 9656f03d2b..01ab7fc008 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -53,8 +53,7 @@ public class FrameAppendMSPInstruction extends 
AppendMSPInstruction {
                //execute map-append operations (partitioning preserving if 
keys for blocks not changing)
                JavaPairRDD<Long,FrameBlock> out = null;
                if( preservesPartitioning(_cbind) ) {
-                       out = in1.mapPartitionsToPair(
-                                       new 
MapSideAppendPartitionFunction(in2), true);
+                       out = appendFrameMSP(in1, in2);
                }
                else 
                        throw new DMLRuntimeException("Append type rbind not 
supported for frame mappend, instead use rappend");
@@ -74,13 +73,20 @@ public class FrameAppendMSPInstruction extends 
AppendMSPInstruction {
                        
sec.getFrameObject(output.getName()).setSchema(sec.getFrameObject(input1.getName()).getSchema());
        }
 
+       public static JavaPairRDD<Long, FrameBlock> 
appendFrameMSP(JavaPairRDD<Long, FrameBlock> in1, 
PartitionedBroadcast<FrameBlock> in2) {
+               JavaPairRDD<Long, FrameBlock> out;
+               out = in1.mapPartitionsToPair(
+                               new MapSideAppendPartitionFunction(in2), true);
+               return out;
+       }
+
        private static boolean preservesPartitioning( boolean cbind ) {
                //Partitions for input1 will be preserved in case of cbind, 
                // where as in case of rbind partitions will not be preserved.
                return cbind;
        }
 
-       private static class MapSideAppendPartitionFunction implements  
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> 
+       private static class MapSideAppendPartitionFunction implements  
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock>
        {
                private static final long serialVersionUID = 
-3997051891171313830L;
 
@@ -118,8 +124,17 @@ public class FrameAppendMSPInstruction extends 
AppendMSPInstruction {
                        
                                int rowix = 
(ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
                                int colix = 1;
+
                                
                                FrameBlock in2 = _pm.getBlock(rowix, colix);
+
+                               //if misalignment -> slice out fb from RHS
+                               if(in1.getNumRows() != in2.getNumRows()){
+                                       int start = ix.intValue() - 1 - 
(rowix-1)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE;
+                                       int end = start + in1.getNumRows() - 1;
+                                       in2 = in2.slice(start, end);
+                               }
+
                                FrameBlock out = in1.append(in2,  true); //cbind
                                return new Tuple2<>(ix, out);
                        }                       
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
index b23a1dabb8..af1be2b0c4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -45,19 +45,9 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction {
                JavaPairRDD<Long,FrameBlock> in2 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() );
                JavaPairRDD<Long,FrameBlock> out = null;
                long leftRows = 
sec.getDataCharacteristics(input1.getName()).getRows();
-               
-               if(_cbind) {
-                       JavaPairRDD<Long,FrameBlock> in1Aligned = 
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
-                       in1Aligned = 
FrameRDDAggregateUtils.mergeByKey(in1Aligned);
-                       JavaPairRDD<Long,FrameBlock> in2Aligned = 
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
-                       in2Aligned = 
FrameRDDAggregateUtils.mergeByKey(in2Aligned);
-                       
-                       out = in1Aligned.join(in2Aligned).mapValues(new 
ReduceSideColumnsFunction(_cbind));
-               } else {        //rbind
-                       JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new 
ReduceSideAppendRowsFunction(leftRows));
-                       out = in1.union(right);
-               }
-               
+
+               out = appendFrameRSP(in1, in2, leftRows, _cbind);
+
                //put output RDD handle into symbol table
                updateBinaryAppendOutputDataCharacteristics(sec, _cbind);
                sec.setRDDHandleForVariable(output.getName(), out);
@@ -73,6 +63,19 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction {
                        
sec.getFrameObject(output.getName()).setSchema(sec.getFrameObject(input1.getName()).getSchema());
        }
 
+       public static JavaPairRDD<Long, FrameBlock> 
appendFrameRSP(JavaPairRDD<Long, FrameBlock> in1, JavaPairRDD<Long, FrameBlock> 
in2, long leftRows, boolean cbind) {
+               if(cbind) {
+                       JavaPairRDD<Long,FrameBlock> in1Aligned = 
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+                       in1Aligned = 
FrameRDDAggregateUtils.mergeByKey(in1Aligned);
+                       JavaPairRDD<Long,FrameBlock> in2Aligned = 
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+                       in2Aligned = 
FrameRDDAggregateUtils.mergeByKey(in2Aligned);
+                       return in1Aligned.join(in2Aligned).mapValues(new 
ReduceSideColumnsFunction(cbind));
+               } else {        //rbind
+                       JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new 
ReduceSideAppendRowsFunction(leftRows));
+                       return in1.union(right);
+               }
+       }
+
        private static class ReduceSideColumnsFunction implements 
Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock> 
        {
                private static final long serialVersionUID = 
-97824903649667646L;
@@ -109,7 +112,7 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction {
                }
        }
 
-       private static class ReduceSideAppendAlignFunction implements 
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+       private static class ReduceSideAppendAlignFunction implements 
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock>
        {
                private static final long serialVersionUID = 
5850400295183766409L;
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
index eb322f6973..7b764e5e6f 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
@@ -46,10 +46,10 @@ public class BuiltinWerTest extends AutomatedTestBase {
                runWerTest(ExecType.CP);
        }
 
-//     @Test
-//     public void testSpark() {
-//             runWerTest(ExecType.SPARK);
-//     }
+       @Test
+       public void testSpark() {
+               runWerTest(ExecType.SPARK);
+       }
 
        private void runWerTest(ExecType instType) {
                ExecMode platformOld = setExecMode(instType);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
index 604a8791c6..c6cde96167 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
@@ -22,6 +22,7 @@ package org.apache.sysds.test.functions.frame;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecMode;
@@ -40,6 +41,8 @@ import org.junit.Test;
 public class FrameAppendDistTest extends AutomatedTestBase
 {
        private final static String TEST_NAME = "FrameAppend";
+       private final static String TEST_NAME2 = "FrameNAryAppend";
+       private final static String TEST_NAME3 = "FrameNAryAppendMisalign";
        private final static String TEST_DIR = "functions/frame/";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameAppendDistTest.class.getSimpleName() + "/";
 
@@ -65,61 +68,83 @@ public class FrameAppendDistTest extends AutomatedTestBase
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
-               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, 
-                               new String[] {"C"}));
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"C"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2,new String[] {"C"}));
+               addTestConfiguration(TEST_NAME3, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3,new String[] {"C"}));
        }
 
        @Test
        public void testAppendInBlock1DenseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a, 
false, AppendMethod.MR_RAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a, 
false, AppendMethod.MR_RAPPEND, false, TEST_NAME);
        }   
        
        @Test
        public void testAppendInBlock1SparseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a, 
true, AppendMethod.MR_RAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a, 
true, AppendMethod.MR_RAPPEND, false, TEST_NAME);
        }   
        
        @Test
        public void testAppendInBlock1DenseRBindSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows2, cols1a, cols1a, 
false, AppendMethod.MR_RAPPEND, true);
+               commonAppendTest(ExecMode.SPARK, rows1, rows2, cols1a, cols1a, 
false, AppendMethod.MR_RAPPEND, true, TEST_NAME);
        }
        
        @Test
        public void testAppendInBlock1SparseRBindSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols1a, 
true, AppendMethod.MR_RAPPEND, true);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols1a, 
true, AppendMethod.MR_RAPPEND, true, TEST_NAME);
        }
        
        //NOTE: mappend only applied for m2_cols<=blocksize
        @Test
        public void testMapAppendInBlock2DenseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a, 
false, AppendMethod.MR_MAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a, 
false, AppendMethod.MR_MAPPEND, false, TEST_NAME);
        }
        
        @Test
        public void testMapAppendInBlock2SparseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a, 
true, AppendMethod.MR_MAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a, 
true, AppendMethod.MR_MAPPEND, false, TEST_NAME);
        }
        
        @Test
        public void testMapAppendOutBlock2DenseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d, 
false, AppendMethod.MR_MAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d, 
false, AppendMethod.MR_MAPPEND, false, TEST_NAME);
        }
        
        @Test
        public void testMapAppendOutBlock2SparseSP() {
-               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d, 
true, AppendMethod.MR_MAPPEND, false);
+               commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d, 
true, AppendMethod.MR_MAPPEND, false, TEST_NAME);
        }
+
+       @Test
+       public void testNAryCAppendMSP(){
+               commonAppendTest(ExecMode.SPARK ,100, 100, 5, 10, false, null, 
false, TEST_NAME2);;
+       }
+
+       @Test
+       public void testNAryCAppendRSP(){
+               commonAppendTest(ExecMode.SPARK ,30, 30, 5, 1001, false, null, 
false, TEST_NAME2);;
+       }
+
+       @Test
+       public void testNAryRAppendSP(){
+               commonAppendTest(ExecMode.SPARK ,100, 100, 5, 5, false, null, 
true, TEST_NAME2);;
+       }
+
+       @Test
+       public void testNAryAppendWithMisalignmentMSP(){
+               commonAppendTest(ExecMode.SPARK ,5, 10, 5, 5, false, null, 
false, TEST_NAME3);;
+       }
+
        
        public void commonAppendTest(ExecMode platform, int rows1, int rows2, 
int cols1, int cols2, boolean sparse,
-               AppendMethod forcedAppendMethod, boolean rbind)
+               AppendMethod forcedAppendMethod, boolean rbind, String 
test_name)
        {
-               TestConfiguration config = 
getAndLoadTestConfiguration(TEST_NAME);
+               TestConfiguration config = 
getAndLoadTestConfiguration(test_name);
                
                ExecMode prevPlfm=rtplatform;
                
                double sparsity = (sparse) ? sparsity2 : sparsity1; 
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-               setOutputBuffering(true);
+               //setOutputBuffering(true);
                try
                {
                        if(forcedAppendMethod != null) {
@@ -134,12 +159,12 @@ public class FrameAppendDistTest extends AutomatedTestBase
                        
                        /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
                        String RI_HOME = SCRIPT_DIR + TEST_DIR;
-                       fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
-                       programArgs = new String[]{"-explain","-args",  
input("A"), 
+                       fullDMLScriptName = RI_HOME + test_name + ".dml";
+                       programArgs = new String[]{"-explain","-args",  
input("A"),
                                Long.toString(rows1), Long.toString(cols1), 
input("B"),
                                Long.toString(rows2), Long.toString(cols2), 
output("C"),
                                (rbind? "rbind": "cbind")};
-                       fullRScriptName = RI_HOME + TEST_NAME + ".R";
+                       fullRScriptName = RI_HOME + test_name + ".R";
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
                                inputDir() + " " + expectedDir() + " " + 
(rbind? "rbind": "cbind");
        
@@ -157,14 +182,15 @@ public class FrameAppendDistTest extends AutomatedTestBase
                        runTest(true, exceptionExpected, null, 
expectedNumberOfJobs);
                        runRScript(true);
 
-                       ValueType[] lschemaAB = rbind ? lschemaA : 
UtilFunctions.copyOf(lschemaA, lschemaB);
-                       
+                       ValueType[] lschemaOut = rbind ? lschemaA : 
UtilFunctions.copyOf(lschemaA, lschemaB);
+                       if(!Objects.equals(test_name, TEST_NAME) && !rbind)
+                               lschemaOut =  UtilFunctions.copyOf(lschemaOut, 
lschemaB);
                        for(String file: config.getOutputFiles())
                        {
                                FrameBlock frameBlock = 
readDMLFrameFromHDFS(file, FileFormat.BINARY);
                                FrameBlock frameRBlock = 
readRFrameFromHDFS(file + ".csv", FileFormat.CSV, frameBlock.getNumRows(),
                                        frameBlock.getNumColumns());
-                               verifyFrameData(frameBlock, frameRBlock, 
lschemaAB);
+                               verifyFrameData(frameBlock, frameRBlock, 
lschemaOut);
                        }
                }
                catch(Exception ex) {
diff --git a/src/test/scripts/functions/frame/FrameNAryAppend.R 
b/src/test/scripts/functions/frame/FrameNAryAppend.R
new file mode 100644
index 0000000000..47a128beca
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppend.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+if(args[3] == "rbind") {
+       C=rbind(A, B, B)
+} else {
+       C=cbind(A, B, B)
+}
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)
diff --git a/src/test/scripts/functions/frame/FrameNAryAppend.dml 
b/src/test/scripts/functions/frame/FrameNAryAppend.dml
new file mode 100644
index 0000000000..e403d93066
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppend.dml
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+
+if ($8 == "rbind") {
+       C=rbind(A, B, B)
+} else {
+       C=cbind(A, B, B)
+}
+write(C, $7, format="binary")
diff --git a/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R 
b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R
new file mode 100644
index 0000000000..b4ad63d32e
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+t1=rbind(A, B, B)
+C=cbind(t1, t1, t1)
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)
diff --git a/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml 
b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml
new file mode 100644
index 0000000000..22a492127d
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+t=rbind(A, B, B)
+C=cbind(t, t, t)
+#C=cbind(C, t)
+write(C, $7, format="binary")

Reply via email to