Author: xuefu
Date: Thu Feb 18 14:01:14 2016
New Revision: 1731076
URL: http://svn.apache.org/viewvc?rev=1731076&view=rev
Log:
Revert PIG-4601: Implement Merge CoGroup for Spark engine (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Feb 18 14:01:14 2016
@@ -55,7 +55,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
@@ -77,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
-import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
@@ -204,7 +202,6 @@ public class SparkLauncher extends Launc
convertMap.put(PORank.class, new RankConverter());
convertMap.put(POStream.class, new StreamConverter(confBytes));
convertMap.put(POFRJoin.class, new FRJoinConverter());
- convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
convertMap.put(POPreCombinerLocalRearrange.class, new
LocalRearrangeConverter());
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Thu Feb 18 14:01:14 2016
@@ -18,38 +18,29 @@
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
+import java.util.UUID;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.rdd.RDD;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
public class SparkUtil {
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -128,25 +119,4 @@ public class SparkUtil {
return new MapReducePartitionerWrapper(customPartitioner,
parallelism);
}
}
-
- // createIndexerSparkNode is a utility to create an indexer spark node
with baseSparkOp
- static public void createIndexerSparkNode(SparkOperator baseSparkOp,
String scope, NodeIdGenerator nig) throws PlanException, ExecException {
- List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
- PhysicalPlan ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,
- nig.getNextNodeId(scope)));
- prj.setStar(true);
- prj.setOverloaded(false);
- prj.setResultType(DataType.TUPLE);
- ep.add(prj);
- eps.add(ep);
-
- List<Boolean> ascCol = new ArrayList<Boolean>();
- ascCol.add(true);
-
- int requestedParallelism = baseSparkOp.requestedParallelism;
- POSort sort = new POSort(new OperatorKey(scope,
nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
- //POSort is added to sort the index tuples genereated by
MergeJoinIndexer.More detail, see PIG-4601
- baseSparkOp.physicalPlan.addAsLeaf(sort);
- }
-}
\ No newline at end of file
+}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Thu Feb 18 14:01:14 2016
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,16 +30,10 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
-import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -54,7 +47,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -68,11 +60,11 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -81,8 +73,6 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.Utils;
/**
* The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -811,173 +801,4 @@ public class SparkCompiler extends PhyPl
finPlan.merge(e);
}
}
-
- @Override
- public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws
VisitorException {
- if (compiledInputs.length < 2) {
- int errCode = 2251;
- String errMsg = "Merge Cogroup work on two or more relations." +
- "To use map-side group-by on single relation, use
'collected' qualifier.";
- throw new SparkCompilerException(errMsg, errCode);
- }
-
- List<FuncSpec> funcSpecs = new
ArrayList<FuncSpec>(compiledInputs.length - 1);
- List<String> fileSpecs = new ArrayList<String>(compiledInputs.length -
1);
- List<String> loaderSigns = new ArrayList<String>(compiledInputs.length
- 1);
-
- try {
- poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
-
- // Iterate through all the SparkOpererators, disconnect side
SparkOperators from
- // SparkOperator and collect all the information needed in
different lists.
-
- for (int i = 0; i < compiledInputs.length; i++) {
- SparkOperator sparkOper = compiledInputs[i];
- PhysicalPlan plan = sparkOper.physicalPlan;
- if (plan.getRoots().size() != 1) {
- int errCode = 2171;
- String errMsg = "Expected one but found more then one root
physical operator in physical plan.";
- throw new SparkCompilerException(errMsg, errCode,
PigException.BUG);
- }
-
- PhysicalOperator rootPOOp = plan.getRoots().get(0);
- if (!(rootPOOp instanceof POLoad)) {
- int errCode = 2172;
- String errMsg = "Expected physical operator at root to be
POLoad. Found : " + rootPOOp.getClass().getCanonicalName();
- throw new SparkCompilerException(errMsg, errCode);
- }
-
- POLoad sideLoader = (POLoad) rootPOOp;
- FileSpec loadFileSpec = sideLoader.getLFile();
- FuncSpec funcSpec = loadFileSpec.getFuncSpec();
- LoadFunc loadfunc = sideLoader.getLoadFunc();
- if (i == 0) {
-
- if
(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
- int errCode = 2252;
- throw new SparkCompilerException("Base loader in
Cogroup must implement CollectableLoadFunc.", errCode);
- }
-
- ((CollectableLoadFunc)
loadfunc).ensureAllKeyInstancesInSameSplit();
- continue;
- }
- if
(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
- int errCode = 2253;
- throw new SparkCompilerException("Side loaders in cogroup
must implement IndexableLoadFunc.", errCode);
- }
-
- funcSpecs.add(funcSpec);
- fileSpecs.add(loadFileSpec.getFileName());
- loaderSigns.add(sideLoader.getSignature());
- sparkPlan.remove(sparkOper);
- }
-
- poCoGrp.setSideLoadFuncs(funcSpecs);
- poCoGrp.setSideFileSpecs(fileSpecs);
- poCoGrp.setLoaderSignatures(loaderSigns);
-
- // Use spark operator of base relation for the cogroup operation.
- SparkOperator baseSparkOp =
phyToSparkOpMap.get(poCoGrp.getInputs().get(0));
-
- // Create a spark operator to generate index file for tuples from
leftmost relation
- SparkOperator indexerSparkOp = getSparkOp();
- FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp,
poCoGrp.getLRInnerPlansOf(0));
- poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
- poCoGrp.setIndexFileName(idxFileSpec.getFileName());
-
- baseSparkOp.physicalPlan.addAsLeaf(poCoGrp);
- for (FuncSpec funcSpec : funcSpecs)
- baseSparkOp.UDFs.add(funcSpec.toString());
-
- sparkPlan.add(indexerSparkOp);
- sparkPlan.connect(indexerSparkOp, baseSparkOp);
- phyToSparkOpMap.put(poCoGrp, baseSparkOp);
- curSparkOp = baseSparkOp;
- } catch (ExecException e) {
- throw new SparkCompilerException(e.getDetailedMessage(),
e.getErrorCode(), e.getErrorSource(), e);
- } catch (SparkCompilerException mrce) {
- throw (mrce);
- } catch (CloneNotSupportedException e) {
- throw new SparkCompilerException(e);
- } catch (PlanException e) {
- int errCode = 2034;
- String msg = "Error compiling operator " +
poCoGrp.getClass().getCanonicalName();
- throw new SparkCompilerException(msg, errCode, PigException.BUG,
e);
- } catch (IOException e) {
- int errCode = 3000;
- String errMsg = "IOException caught while compiling
POMergeCoGroup";
- throw new SparkCompilerException(errMsg, errCode, e);
- }
- }
-
- // Sets up the indexing job for single-stage cogroups.
- private FileSpec getIndexingJob(SparkOperator indexerSparkOp,
- final SparkOperator baseSparkOp, final
List<PhysicalPlan> mapperLRInnerPlans)
- throws SparkCompilerException, PlanException, ExecException,
IOException, CloneNotSupportedException {
-
- // First replace loader with MergeJoinIndexer.
- PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan;
- POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0);
- FileSpec origLoaderFileSpec = baseLoader.getLFile();
- FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
- LoadFunc loadFunc = baseLoader.getLoadFunc();
-
- if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) {
- int errCode = 1104;
- String errMsg = "Base relation of merge-coGroup must implement " +
- "OrderedLoadFunc interface. The specified loader "
- + funcSpec + " doesn't implement it";
- throw new SparkCompilerException(errMsg, errCode);
- }
-
- String[] indexerArgs = new String[6];
- indexerArgs[0] = funcSpec.toString();
- indexerArgs[1] = ObjectSerializer.serialize((Serializable)
mapperLRInnerPlans);
- indexerArgs[3] = baseLoader.getSignature();
- indexerArgs[4] = baseLoader.getOperatorKey().scope;
- indexerArgs[5] = Boolean.toString(false); // we care for nulls.
-
- PhysicalPlan phyPlan;
- if (baseMapPlan.getSuccessors(baseLoader) == null
- || baseMapPlan.getSuccessors(baseLoader).isEmpty()) {
- // Load-Load-Cogroup case.
- phyPlan = null;
- } else { // We got something. Yank it and set it as inner plan.
- phyPlan = baseMapPlan.clone();
- PhysicalOperator root = phyPlan.getRoots().get(0);
- phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
- phyPlan.remove(root);
-
- }
- indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
-
- POLoad idxJobLoader = getLoad();
- idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
- new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
- indexerSparkOp.physicalPlan.add(idxJobLoader);
-
indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
-
- // Loader of sparkOp will return a tuple of form -
- // (key1, key2, .. , WritableComparable, splitIndex). See
MergeJoinIndexer for details.
- // Create a spark node to retrieve index file by MergeJoinIndexer
- SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig);
-
- POStore st = getStore();
- FileSpec strFile = getTempFileSpec();
- st.setSFile(strFile);
- indexerSparkOp.physicalPlan.addAsLeaf(st);
-
- return strFile;
- }
-
- /**
- * Returns a temporary DFS Path
- *
- * @return
- * @throws IOException
- */
- private FileSpec getTempFileSpec() throws IOException {
- return new
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
- new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
- }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Thu Feb 18 14:01:14 2016
@@ -22,17 +22,13 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -44,18 +40,12 @@ public class PigInputFormatSpark extends
InterruptedException {
init();
resetUDFContext();
+ RecordReader recordReader = super.createRecordReader(split, context);
//PigSplit#conf is the default hadoop configuration, we need get the
configuration
//from context.getConfigration() to retrieve pig properties
PigSplit pigSplit = (PigSplit) split;
- Configuration conf = context.getConfiguration();
- pigSplit.setConf(conf);
- //Set current splitIndex in
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
- //which will be used in POMergeCogroup#setup
- if (PigMapReduce.sJobContext == null) {
- PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new
JobID());
- }
-
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
pigSplit.getSplitIndex());
- return super.createRecordReader(split, context);
+ pigSplit.setConf(context.getConfiguration());
+ return recordReader;
}
private void resetUDFContext() {
Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Feb
18 14:01:14 2016
@@ -311,11 +311,12 @@ public class TestMapSideCogroup {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4
+ "' using "+ DummyCollectableLoader.class.getName() +"() as
(c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+
DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
- List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
+ DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
+
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
@@ -334,29 +335,12 @@ public class TestMapSideCogroup {
"(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
};
- List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
-
- //We need sort dbMergeCogrp because the result is different in
sequence between spark and other mode when
- //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
- for (Tuple t : dbMergeCogrp) {
- Util.convertBagToSortedBag(t);
- }
- for (Tuple t : expected) {
- Util.convertBagToSortedBag(t);
- }
-
- Collections.sort(dbMergeCogrp);
- Collections.sort(expected);
- assertEquals(dbMergeCogrp.size(), expected.size());
-
- //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
- //apply schema for each input
tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
- // The schema for C is
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
- //But the schema for result "dbMergeCogrp" is
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
+ assertEquals(9, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
- for (int i = 0; i < dbMergeCogrp.size(); i++) {
- assertEquals(itr.next().toString(), expected.get(i).toString());
+ for(int i=0; i<9; i++){
+ assertEquals(itr.next().toString(), results[i]);
}
+ assertFalse(itr.hasNext());
}
@Test