This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 7d1f081e80 [SYSTEMDS-3695] Fix missing frame nary-append spark
instruction
7d1f081e80 is described below
commit 7d1f081e8089f7700d049976b48d61fa530e1deb
Author: e-strauss <[email protected]>
AuthorDate: Mon Jun 3 09:33:37 2024 +0200
[SYSTEMDS-3695] Fix missing frame nary-append spark instruction
Closes #2026.
---
.../spark/BuiltinNarySPInstruction.java | 135 ++++++++++++++++++---
.../spark/FrameAppendMSPInstruction.java | 21 +++-
.../spark/FrameAppendRSPInstruction.java | 31 ++---
.../functions/builtin/part2/BuiltinWerTest.java | 8 +-
.../test/functions/frame/FrameAppendDistTest.java | 64 +++++++---
src/test/scripts/functions/frame/FrameNAryAppend.R | 33 +++++
.../scripts/functions/frame/FrameNAryAppend.dml | 30 +++++
.../functions/frame/FrameNAryAppendMisalign.R | 30 +++++
.../functions/frame/FrameNAryAppendMisalign.dml | 27 +++++
9 files changed, 319 insertions(+), 60 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
index 313af16dbc..80d25f290d 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -20,12 +20,19 @@
package org.apache.sysds.runtime.instructions.spark;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.sysds.hops.BinaryOp;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -47,8 +54,16 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.UtilFunctions;
import scala.Tuple2;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import static org.apache.sysds.hops.BinaryOp.AppendMethod.MR_MAPPEND;
+import static org.apache.sysds.hops.BinaryOp.AppendMethod.MR_RAPPEND;
+import static org.apache.sysds.hops.OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE;
+import static
org.apache.sysds.runtime.instructions.spark.FrameAppendMSPInstruction.appendFrameMSP;
+import static
org.apache.sysds.runtime.instructions.spark.FrameAppendRSPInstruction.appendFrameRSP;
+
public class BuiltinNarySPInstruction extends SPInstruction implements
LineageTraceable
{
private CPOperand[] inputs;
@@ -75,32 +90,82 @@ public class BuiltinNarySPInstruction extends SPInstruction
implements LineageTr
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
- DataCharacteristics mcOut = null;
+ DataCharacteristics dcout = null;
+ boolean inputIsMatrix = inputs[0].isMatrix();
+
if( getOpcode().equals("cbind") || getOpcode().equals("rbind")
) {
//compute output characteristics
boolean cbind = getOpcode().equals("cbind");
- mcOut = computeAppendOutputDataCharacteristics(sec,
inputs, cbind);
-
- //get consolidated input via union over shifted and
padded inputs
- DataCharacteristics off = new MatrixCharacteristics(0,
0, mcOut.getBlocksize(), 0);
- for( CPOperand input : inputs ) {
- DataCharacteristics mcIn =
sec.getDataCharacteristics(input.getName());
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec
-
.getBinaryMatrixBlockRDDHandleForVariable( input.getName() )
- .flatMapToPair(new ShiftMatrix(off,
mcIn, cbind))
- .mapToPair(new
PadBlocksFunction(mcOut)); //just padding
- out = (out != null) ? out.union(in) : in;
- updateAppendDataCharacteristics(mcIn, off,
cbind);
+ dcout = computeAppendOutputDataCharacteristics(sec,
inputs, cbind);
+ if(inputIsMatrix){
+ //get consolidated input via union over shifted
and padded inputs
+ DataCharacteristics off = new
MatrixCharacteristics(0, 0, dcout.getBlocksize(), 0);
+ for( CPOperand input : inputs ) {
+ DataCharacteristics mcIn =
sec.getDataCharacteristics(input.getName());
+ JavaPairRDD<MatrixIndexes, MatrixBlock>
in = sec
+
.getBinaryMatrixBlockRDDHandleForVariable(input.getName())
+ .flatMapToPair(new
ShiftMatrix(off, mcIn, cbind))
+ .mapToPair(new
PadBlocksFunction(dcout)); //just padding
+ out = (out != null) ? out.union(in) :
in;
+ updateAppendDataCharacteristics(mcIn,
off, cbind);
+ }
+ //aggregate partially overlapping blocks w/
single shuffle
+ int numPartOut =
SparkUtils.getNumPreferredPartitions(dcout);
+ out = RDDAggregateUtils.mergeByKey(out,
numPartOut, false);
+ }
+ //FRAME
+ else {
+ JavaPairRDD<Long,FrameBlock> outFrame =
+
sec.getFrameBinaryBlockRDDHandleForVariable( inputs[0].getName() );
+ dcout = new
MatrixCharacteristics(sec.getDataCharacteristics(inputs[0].getName()));
+ FrameObject fo = new
FrameObject(sec.getFrameObject(inputs[0].getName()));
+ boolean[] broadcasted = new
boolean[inputs.length];
+ broadcasted[0] = false;
+
+ for(int i = 1; i < inputs.length; i++){
+ DataCharacteristics dcIn =
sec.getDataCharacteristics(inputs[i].getName());
+ final int blk_size =
dcout.getBlocksize() <= 0 ? DEFAULT_FRAME_BLOCKSIZE : dcout.getBlocksize();
+
+ broadcasted[i] =
BinaryOp.FORCED_APPEND_METHOD == MR_MAPPEND
+ ||
BinaryOp.FORCED_APPEND_METHOD == null && cbind && dcIn.getCols() <= blk_size
+ &&
OptimizerUtils.checkSparkBroadcastMemoryBudget(
+
dcout.getCols(), dcIn.getCols(), blk_size, dcIn.getNonZeros());
+
+ //easy case: broadcast & map
+ if(broadcasted[i]){
+ outFrame =
appendFrameMSP(outFrame, sec.getBroadcastForFrameVariable(inputs[i].getName()));
+ }
+ //general case for frames:
+ else{
+
if(BinaryOp.FORCED_APPEND_METHOD != null && BinaryOp.FORCED_APPEND_METHOD !=
MR_RAPPEND)
+ throw new
DMLRuntimeException("Forced append type ["
+
+BinaryOp.FORCED_APPEND_METHOD+"] is not supported for frames");
+
+ JavaPairRDD<Long,FrameBlock>
in2 =
+
sec.getFrameBinaryBlockRDDHandleForVariable(inputs[i].getName() );
+ outFrame =
appendFrameRSP(outFrame, in2, dcout.getRows(), cbind);
+ }
+ updateAppendDataCharacteristics(dcIn,
dcout, cbind);
+ if(cbind)
+
fo.setSchema(fo.mergeSchemas(sec.getFrameObject(inputs[i].getName())));
+ }
+
+ //set output RDD and add lineage
+
sec.getDataCharacteristics(output.getName()).set(dcout);
+ sec.setRDDHandleForVariable(output.getName(),
outFrame);
+
sec.getFrameObject(output.getName()).setSchema(fo.getSchema());
+ for( int i = 0; i < inputs.length; i++)
+ if(broadcasted[i])
+
sec.addLineageBroadcast(output.getName(), inputs[i].getName());
+ else
+
sec.addLineageRDD(output.getName(), inputs[i].getName());
+ return;
}
-
- //aggregate partially overlapping blocks w/ single
shuffle
- int numPartOut =
SparkUtils.getNumPreferredPartitions(mcOut);
- out = RDDAggregateUtils.mergeByKey(out, numPartOut,
false);
}
else if( ArrayUtils.contains(new String[]{"nmin","nmax","n+"},
getOpcode()) ) {
//compute output characteristics
- mcOut = computeMinMaxOutputDataCharacteristics(sec,
inputs);
+ dcout = computeMinMaxOutputDataCharacteristics(sec,
inputs);
//get scalars and consolidated input via join
List<ScalarObject> scalars =
sec.getScalarInputs(inputs);
@@ -118,13 +183,43 @@ public class BuiltinNarySPInstruction extends
SPInstruction implements LineageTr
}
//set output RDD and add lineage
- sec.getDataCharacteristics(output.getName()).set(mcOut);
+ sec.getDataCharacteristics(output.getName()).set(dcout);
sec.setRDDHandleForVariable(output.getName(), out);
for( CPOperand input : inputs )
if( !input.isScalar() )
sec.addLineageRDD(output.getName(),
input.getName());
}
-
+
+ @SuppressWarnings("unused")
+ private static class AlignBlkTask implements
PairFlatMapFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> {
+ private static final long serialVersionUID =
1333460067852261573L;
+ long max_rows;
+
+ public AlignBlkTask(long rows) {
+ max_rows = rows;
+ }
+
+ @Override
+ public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long,
FrameBlock> longFrameBlockTuple2) throws Exception {
+ Long index = longFrameBlockTuple2._1;
+ FrameBlock fb = longFrameBlockTuple2._2;
+ ArrayList<Tuple2<Long, FrameBlock>> list = new
ArrayList<Tuple2<Long, FrameBlock>>();
+ //single output block
+ if(max_rows <= DEFAULT_FRAME_BLOCKSIZE){
+ FrameBlock fbout = new
FrameBlock(fb.getSchema());
+ fbout.ensureAllocatedColumns((int) max_rows);
+ fbout =
fbout.leftIndexingOperations(fb,index.intValue() - 1, index.intValue() +
fb.getNumRows() - 2,0, fb.getNumColumns()-1, null );
+ list.add(new Tuple2<>(1L, fbout));
+ } else {
+ throw new NotImplementedException("Other
Alignment strategies need to be implemented");
+ //long aligned_index = (index /
DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+ //list.add(new Tuple2<>(index /
DEFAULT_FRAME_BLOCKSIZE + 1, fb));
+ }
+
+ return list.iterator();
+ }
+ }
+
private static DataCharacteristics
computeAppendOutputDataCharacteristics(SparkExecutionContext sec, CPOperand[]
inputs, boolean cbind) {
DataCharacteristics mcIn1 =
sec.getDataCharacteristics(inputs[0].getName());
DataCharacteristics mcOut = new MatrixCharacteristics(0, 0,
mcIn1.getBlocksize(), 0);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
index 9656f03d2b..01ab7fc008 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -53,8 +53,7 @@ public class FrameAppendMSPInstruction extends
AppendMSPInstruction {
//execute map-append operations (partitioning preserving if
keys for blocks not changing)
JavaPairRDD<Long,FrameBlock> out = null;
if( preservesPartitioning(_cbind) ) {
- out = in1.mapPartitionsToPair(
- new
MapSideAppendPartitionFunction(in2), true);
+ out = appendFrameMSP(in1, in2);
}
else
throw new DMLRuntimeException("Append type rbind not
supported for frame mappend, instead use rappend");
@@ -74,13 +73,20 @@ public class FrameAppendMSPInstruction extends
AppendMSPInstruction {
sec.getFrameObject(output.getName()).setSchema(sec.getFrameObject(input1.getName()).getSchema());
}
+ public static JavaPairRDD<Long, FrameBlock>
appendFrameMSP(JavaPairRDD<Long, FrameBlock> in1,
PartitionedBroadcast<FrameBlock> in2) {
+ JavaPairRDD<Long, FrameBlock> out;
+ out = in1.mapPartitionsToPair(
+ new MapSideAppendPartitionFunction(in2), true);
+ return out;
+ }
+
private static boolean preservesPartitioning( boolean cbind ) {
//Partitions for input1 will be preserved in case of cbind,
// where as in case of rbind partitions will not be preserved.
return cbind;
}
- private static class MapSideAppendPartitionFunction implements
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock>
+ private static class MapSideAppendPartitionFunction implements
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock>
{
private static final long serialVersionUID =
-3997051891171313830L;
@@ -118,8 +124,17 @@ public class FrameAppendMSPInstruction extends
AppendMSPInstruction {
int rowix =
(ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
int colix = 1;
+
FrameBlock in2 = _pm.getBlock(rowix, colix);
+
+ //if misalignment -> slice out fb from RHS
+ if(in1.getNumRows() != in2.getNumRows()){
+ int start = ix.intValue() - 1 -
(rowix-1)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE;
+ int end = start + in1.getNumRows() - 1;
+ in2 = in2.slice(start, end);
+ }
+
FrameBlock out = in1.append(in2, true); //cbind
return new Tuple2<>(ix, out);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
index b23a1dabb8..af1be2b0c4 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -45,19 +45,9 @@ public class FrameAppendRSPInstruction extends
AppendRSPInstruction {
JavaPairRDD<Long,FrameBlock> in2 =
sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() );
JavaPairRDD<Long,FrameBlock> out = null;
long leftRows =
sec.getDataCharacteristics(input1.getName()).getRows();
-
- if(_cbind) {
- JavaPairRDD<Long,FrameBlock> in1Aligned =
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
- in1Aligned =
FrameRDDAggregateUtils.mergeByKey(in1Aligned);
- JavaPairRDD<Long,FrameBlock> in2Aligned =
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
- in2Aligned =
FrameRDDAggregateUtils.mergeByKey(in2Aligned);
-
- out = in1Aligned.join(in2Aligned).mapValues(new
ReduceSideColumnsFunction(_cbind));
- } else { //rbind
- JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new
ReduceSideAppendRowsFunction(leftRows));
- out = in1.union(right);
- }
-
+
+ out = appendFrameRSP(in1, in2, leftRows, _cbind);
+
//put output RDD handle into symbol table
updateBinaryAppendOutputDataCharacteristics(sec, _cbind);
sec.setRDDHandleForVariable(output.getName(), out);
@@ -73,6 +63,19 @@ public class FrameAppendRSPInstruction extends
AppendRSPInstruction {
sec.getFrameObject(output.getName()).setSchema(sec.getFrameObject(input1.getName()).getSchema());
}
+ public static JavaPairRDD<Long, FrameBlock>
appendFrameRSP(JavaPairRDD<Long, FrameBlock> in1, JavaPairRDD<Long, FrameBlock>
in2, long leftRows, boolean cbind) {
+ if(cbind) {
+ JavaPairRDD<Long,FrameBlock> in1Aligned =
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+ in1Aligned =
FrameRDDAggregateUtils.mergeByKey(in1Aligned);
+ JavaPairRDD<Long,FrameBlock> in2Aligned =
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+ in2Aligned =
FrameRDDAggregateUtils.mergeByKey(in2Aligned);
+ return in1Aligned.join(in2Aligned).mapValues(new
ReduceSideColumnsFunction(cbind));
+ } else { //rbind
+ JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new
ReduceSideAppendRowsFunction(leftRows));
+ return in1.union(right);
+ }
+ }
+
private static class ReduceSideColumnsFunction implements
Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock>
{
private static final long serialVersionUID =
-97824903649667646L;
@@ -109,7 +112,7 @@ public class FrameAppendRSPInstruction extends
AppendRSPInstruction {
}
}
- private static class ReduceSideAppendAlignFunction implements
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock>
+ private static class ReduceSideAppendAlignFunction implements
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock>
{
private static final long serialVersionUID =
5850400295183766409L;
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
index eb322f6973..7b764e5e6f 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinWerTest.java
@@ -46,10 +46,10 @@ public class BuiltinWerTest extends AutomatedTestBase {
runWerTest(ExecType.CP);
}
-// @Test
-// public void testSpark() {
-// runWerTest(ExecType.SPARK);
-// }
+ @Test
+ public void testSpark() {
+ runWerTest(ExecType.SPARK);
+ }
private void runWerTest(ExecType instType) {
ExecMode platformOld = setExecMode(instType);
diff --git
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
index 604a8791c6..c6cde96167 100644
---
a/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/frame/FrameAppendDistTest.java
@@ -22,6 +22,7 @@ package org.apache.sysds.test.functions.frame;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
@@ -40,6 +41,8 @@ import org.junit.Test;
public class FrameAppendDistTest extends AutomatedTestBase
{
private final static String TEST_NAME = "FrameAppend";
+ private final static String TEST_NAME2 = "FrameNAryAppend";
+ private final static String TEST_NAME3 = "FrameNAryAppendMisalign";
private final static String TEST_DIR = "functions/frame/";
private final static String TEST_CLASS_DIR = TEST_DIR +
FrameAppendDistTest.class.getSimpleName() + "/";
@@ -65,61 +68,83 @@ public class FrameAppendDistTest extends AutomatedTestBase
@Override
public void setUp() {
TestUtils.clearAssertionInformation();
- addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME,
- new String[] {"C"}));
+ addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"C"}));
+ addTestConfiguration(TEST_NAME2, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2,new String[] {"C"}));
+ addTestConfiguration(TEST_NAME3, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3,new String[] {"C"}));
}
@Test
public void testAppendInBlock1DenseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a,
false, AppendMethod.MR_RAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a,
false, AppendMethod.MR_RAPPEND, false, TEST_NAME);
}
@Test
public void testAppendInBlock1SparseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a,
true, AppendMethod.MR_RAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols2a,
true, AppendMethod.MR_RAPPEND, false, TEST_NAME);
}
@Test
public void testAppendInBlock1DenseRBindSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows2, cols1a, cols1a,
false, AppendMethod.MR_RAPPEND, true);
+ commonAppendTest(ExecMode.SPARK, rows1, rows2, cols1a, cols1a,
false, AppendMethod.MR_RAPPEND, true, TEST_NAME);
}
@Test
public void testAppendInBlock1SparseRBindSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols1a,
true, AppendMethod.MR_RAPPEND, true);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1a, cols1a,
true, AppendMethod.MR_RAPPEND, true, TEST_NAME);
}
//NOTE: mappend only applied for m2_cols<=blocksize
@Test
public void testMapAppendInBlock2DenseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a,
false, AppendMethod.MR_MAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a,
false, AppendMethod.MR_MAPPEND, false, TEST_NAME);
}
@Test
public void testMapAppendInBlock2SparseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a,
true, AppendMethod.MR_MAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1b, cols2a,
true, AppendMethod.MR_MAPPEND, false, TEST_NAME);
}
@Test
public void testMapAppendOutBlock2DenseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d,
false, AppendMethod.MR_MAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d,
false, AppendMethod.MR_MAPPEND, false, TEST_NAME);
}
@Test
public void testMapAppendOutBlock2SparseSP() {
- commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d,
true, AppendMethod.MR_MAPPEND, false);
+ commonAppendTest(ExecMode.SPARK, rows1, rows1, cols1d, cols3d,
true, AppendMethod.MR_MAPPEND, false, TEST_NAME);
}
+
+ @Test
+ public void testNAryCAppendMSP(){
+ commonAppendTest(ExecMode.SPARK ,100, 100, 5, 10, false, null,
false, TEST_NAME2);;
+ }
+
+ @Test
+ public void testNAryCAppendRSP(){
+ commonAppendTest(ExecMode.SPARK ,30, 30, 5, 1001, false, null,
false, TEST_NAME2);;
+ }
+
+ @Test
+ public void testNAryRAppendSP(){
+ commonAppendTest(ExecMode.SPARK ,100, 100, 5, 5, false, null,
true, TEST_NAME2);;
+ }
+
+ @Test
+ public void testNAryAppendWithMisalignmentMSP(){
+ commonAppendTest(ExecMode.SPARK ,5, 10, 5, 5, false, null,
false, TEST_NAME3);;
+ }
+
public void commonAppendTest(ExecMode platform, int rows1, int rows2,
int cols1, int cols2, boolean sparse,
- AppendMethod forcedAppendMethod, boolean rbind)
+ AppendMethod forcedAppendMethod, boolean rbind, String
test_name)
{
- TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME);
+ TestConfiguration config =
getAndLoadTestConfiguration(test_name);
ExecMode prevPlfm=rtplatform;
double sparsity = (sparse) ? sparsity2 : sparsity1;
boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
- setOutputBuffering(true);
+ //setOutputBuffering(true);
try
{
if(forcedAppendMethod != null) {
@@ -134,12 +159,12 @@ public class FrameAppendDistTest extends AutomatedTestBase
/* This is for running the junit test the new way,
i.e., construct the arguments directly */
String RI_HOME = SCRIPT_DIR + TEST_DIR;
- fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
- programArgs = new String[]{"-explain","-args",
input("A"),
+ fullDMLScriptName = RI_HOME + test_name + ".dml";
+ programArgs = new String[]{"-explain","-args",
input("A"),
Long.toString(rows1), Long.toString(cols1),
input("B"),
Long.toString(rows2), Long.toString(cols2),
output("C"),
(rbind? "rbind": "cbind")};
- fullRScriptName = RI_HOME + TEST_NAME + ".R";
+ fullRScriptName = RI_HOME + test_name + ".R";
rCmd = "Rscript" + " " + fullRScriptName + " " +
inputDir() + " " + expectedDir() + " " +
(rbind? "rbind": "cbind");
@@ -157,14 +182,15 @@ public class FrameAppendDistTest extends AutomatedTestBase
runTest(true, exceptionExpected, null,
expectedNumberOfJobs);
runRScript(true);
- ValueType[] lschemaAB = rbind ? lschemaA :
UtilFunctions.copyOf(lschemaA, lschemaB);
-
+ ValueType[] lschemaOut = rbind ? lschemaA :
UtilFunctions.copyOf(lschemaA, lschemaB);
+ if(!Objects.equals(test_name, TEST_NAME) && !rbind)
+ lschemaOut = UtilFunctions.copyOf(lschemaOut,
lschemaB);
for(String file: config.getOutputFiles())
{
FrameBlock frameBlock =
readDMLFrameFromHDFS(file, FileFormat.BINARY);
FrameBlock frameRBlock =
readRFrameFromHDFS(file + ".csv", FileFormat.CSV, frameBlock.getNumRows(),
frameBlock.getNumColumns());
- verifyFrameData(frameBlock, frameRBlock,
lschemaAB);
+ verifyFrameData(frameBlock, frameRBlock,
lschemaOut);
}
}
catch(Exception ex) {
diff --git a/src/test/scripts/functions/frame/FrameNAryAppend.R
b/src/test/scripts/functions/frame/FrameNAryAppend.R
new file mode 100644
index 0000000000..47a128beca
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppend.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE,
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE,
stringsAsFactors=FALSE)
+if(args[3] == "rbind") {
+ C=rbind(A, B, B)
+} else {
+ C=cbind(A, B, B)
+}
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)
diff --git a/src/test/scripts/functions/frame/FrameNAryAppend.dml
b/src/test/scripts/functions/frame/FrameNAryAppend.dml
new file mode 100644
index 0000000000..e403d93066
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppend.dml
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+
+if ($8 == "rbind") {
+ C=rbind(A, B, B)
+} else {
+ C=cbind(A, B, B)
+}
+write(C, $7, format="binary")
diff --git a/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R
b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R
new file mode 100644
index 0000000000..b4ad63d32e
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.R
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE,
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE,
stringsAsFactors=FALSE)
+t1=rbind(A, B, B)
+C=cbind(t1, t1, t1)
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)
diff --git a/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml
b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml
new file mode 100644
index 0000000000..22a492127d
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameNAryAppendMisalign.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+t=rbind(A, B, B)
+C=cbind(t, t, t)
+#C=cbind(C, t)
+write(C, $7, format="binary")